diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index b8d32843b831b2b4b3cf3e1f5aae5c0faa34cdf2..98c1285c974ddc876cf8ff4f8223c96b4cf080d2 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -33,10 +33,10 @@ #include "tlist.h" #include "tlockfree.h" #include "tlosertree.h" +#include "tlrucache.h" #include "tmallocator.h" #include "tmsgcb.h" #include "tskiplist.h" -#include "tlrucache.h" #include "tstream.h" #include "ttime.h" #include "ttimer.h" @@ -88,7 +88,7 @@ int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* p int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq); int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq); int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids); -int metaTtlDropTable(SMeta *pMeta, int64_t ttl, SArray *tbUids); +int metaTtlDropTable(SMeta* pMeta, int64_t ttl, SArray* tbUids); int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp); SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline); STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver); @@ -107,7 +107,7 @@ int32_t metaSnapshotReaderClose(SMetaSnapshotReader* pReader); int32_t metaSnapshotRead(SMetaSnapshotReader* pReader, void** ppData, uint32_t* nData); void* metaGetIdx(SMeta* pMeta); void* metaGetIvtIdx(SMeta* pMeta); -int metaTtlSmaller(SMeta *pMeta, uint64_t time, SArray *uidList); +int metaTtlSmaller(SMeta* pMeta, uint64_t time, SArray* uidList); int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg); int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid); @@ -183,9 +183,9 @@ typedef struct { // SVState struct SVState { - // int64_t processed; int64_t committed; int64_t applied; + int64_t commitID; }; struct SVnodeInfo { diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 445a4c5ae924bb2010f849c8dd0360f5d34a3ac9..f029f8a820d9eb59b521d7675972b225c75280fc 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -18,6 +18,7 @@ typedef struct { STsdb *pTsdb; /* commit data */ + int64_t commitID; int32_t minutes; int8_t precision; int32_t minRow; @@ -837,6 +838,7 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) { // unlock(); pCommitter->pTsdb = pTsdb; + pCommitter->commitID = pTsdb->pVnode->state.commitID; pCommitter->minutes = pTsdb->keepCfg.days; pCommitter->precision = pTsdb->keepCfg.precision; pCommitter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows; diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 3d9d2d8efbdd5bd5d54afa554a1ddcba6bdc4c59..6aa51cde8e6427d5a701a95c0e6e314515121309 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -40,6 +40,7 @@ int vnodeBegin(SVnode *pVnode) { /* pthread_mutex_unlock(); */ + pVnode->state.commitID++; // begin meta if (metaBegin(pVnode->pMeta) < 0) { vError("vgId:%d, failed to begin meta since %s", TD_VID(pVnode), tstrerror(terrno)); @@ -210,7 +211,8 @@ int vnodeCommit(SVnode *pVnode) { SVnodeInfo info = {0}; char dir[TSDB_FILENAME_LEN]; - vInfo("vgId:%d, start to commit, version: %" PRId64, TD_VID(pVnode), pVnode->state.applied); + vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64, TD_VID(pVnode), pVnode->state.commitID, + pVnode->state.applied); pVnode->onCommit = pVnode->inUse; pVnode->inUse = NULL; @@ -278,7 +280,7 @@ static int vnodeCommitImpl(void *arg) { // metaCommit(pVnode->pMeta); tqCommit(pVnode->pTq); - tsdbCommit(pVnode->pTsdb); + // tsdbCommit(pVnode->pTsdb, ); // vnodeBufPoolRecycle(pVnode); tsem_post(&(pVnode->canCommit)); @@ -302,6 +304,7 @@ static int vnodeEncodeState(const void *pObj, SJson *pJson) { if (tjsonAddIntegerToObject(pJson, "commit version", pState->committed) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "applied version", pState->applied) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "commit ID", pState->commitID) < 0) return -1; return 0; } @@ -314,6 +317,8 @@ static int vnodeDecodeState(const SJson *pJson, void *pObj) { if (code < 0) return -1; tjsonGetNumberValue(pJson, "applied version", pState->applied, code); if (code < 0) return -1; + tjsonGetNumberValue(pJson, "commit ID", pState->commitID, code); + if (code < 0) return -1; return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index dfc258b42b75bb3b5da4012a4f44ed0c7d337f05..ba646ef44db53cbbb7b9eb126bd5bdb814efc2d4 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -37,6 +37,7 @@ int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) { info.config = *pCfg; info.state.committed = -1; info.state.applied = -1; + info.state.commitID = 0; if (vnodeSaveInfo(dir, &info) < 0 || vnodeCommitInfo(dir, &info) < 0) { vError("vgId:%d, failed to save vnode config since %s", pCfg->vgId, tstrerror(terrno)); @@ -79,6 +80,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { pVnode->config = info.config; pVnode->state.committed = info.state.committed; pVnode->state.applied = info.state.committed; + pVnode->state.commitID = info.state.commitID; pVnode->pTfs = pTfs; pVnode->msgCb = msgCb; pVnode->syncCount = 0;