提交 474aeb44 编写于 作者: S Shungang Li

add ttlExpireTime to vnode.json/state

上级 14b8a312
...@@ -347,6 +347,7 @@ struct SVState { ...@@ -347,6 +347,7 @@ struct SVState {
int64_t applyTerm; int64_t applyTerm;
int64_t commitID; int64_t commitID;
int64_t commitTerm; int64_t commitTerm;
int64_t ttlExpireTime;
}; };
struct SVStatis { struct SVStatis {
......
...@@ -42,6 +42,8 @@ static int32_t ttlMgrULock(STtlManger *pTtlMgr); ...@@ -42,6 +42,8 @@ static int32_t ttlMgrULock(STtlManger *pTtlMgr);
const char *ttlTbname = "ttl.idx"; const char *ttlTbname = "ttl.idx";
const char *ttlV1Tbname = "ttlv1.idx"; const char *ttlV1Tbname = "ttlv1.idx";
#define TTL_EXPIRE_TIME_UNINIT -1
int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback, const char *logPrefix, int32_t flushThreshold) { int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback, const char *logPrefix, int32_t flushThreshold) {
int ret = TSDB_CODE_SUCCESS; int ret = TSDB_CODE_SUCCESS;
int64_t startNs = taosGetTimestampNs(); int64_t startNs = taosGetTimestampNs();
...@@ -59,7 +61,7 @@ int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback, const char *lo ...@@ -59,7 +61,7 @@ int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback, const char *lo
strcpy(logBuffer, logPrefix); strcpy(logBuffer, logPrefix);
pTtlMgr->logPrefix = logBuffer; pTtlMgr->logPrefix = logBuffer;
pTtlMgr->flushThreshold = flushThreshold; pTtlMgr->flushThreshold = flushThreshold;
pTtlMgr->expireTimeMs = 0; // TODO(LSG) pTtlMgr->expireTimeMs = TTL_EXPIRE_TIME_UNINIT;
ret = tdbTbOpen(ttlV1Tbname, TDB_VARIANT_LEN, TDB_VARIANT_LEN, ttlIdxKeyV1Cmpr, pEnv, &pTtlMgr->pTtlIdx, rollback); ret = tdbTbOpen(ttlV1Tbname, TDB_VARIANT_LEN, TDB_VARIANT_LEN, ttlIdxKeyV1Cmpr, pEnv, &pTtlMgr->pTtlIdx, rollback);
if (ret < 0) { if (ret < 0) {
...@@ -370,9 +372,9 @@ _out: ...@@ -370,9 +372,9 @@ _out:
int ttlMgrFindExpired(STtlManger *pTtlMgr, SArray *pTbUids) { int ttlMgrFindExpired(STtlManger *pTtlMgr, SArray *pTbUids) {
ttlMgrRLock(pTtlMgr); ttlMgrRLock(pTtlMgr);
if (pTtlMgr->expireTimeMs == 0) { if (pTtlMgr->expireTimeMs == TTL_EXPIRE_TIME_UNINIT) {
metaError("%s, ttl mgr expireTimeMs uninitialized, skip find expired", pTtlMgr->logPrefix); metaError("%s, ttl mgr expireTimeMs uninitialized, skip find expired", pTtlMgr->logPrefix);
return 0; goto _out;
} }
TBC *pCur; TBC *pCur;
......
...@@ -291,6 +291,7 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) { ...@@ -291,6 +291,7 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
pInfo->info.state.committed = pVnode->state.applied; pInfo->info.state.committed = pVnode->state.applied;
pInfo->info.state.commitTerm = pVnode->state.applyTerm; pInfo->info.state.commitTerm = pVnode->state.applyTerm;
pInfo->info.state.commitID = ++pVnode->state.commitID; pInfo->info.state.commitID = ++pVnode->state.commitID;
pInfo->info.state.ttlExpireTime = pVnode->state.ttlExpireTime;
pInfo->pVnode = pVnode; pInfo->pVnode = pVnode;
pInfo->txn = metaGetTxn(pVnode->pMeta); pInfo->txn = metaGetTxn(pVnode->pMeta);
...@@ -572,6 +573,7 @@ static int vnodeEncodeState(const void *pObj, SJson *pJson) { ...@@ -572,6 +573,7 @@ static int vnodeEncodeState(const void *pObj, SJson *pJson) {
if (tjsonAddIntegerToObject(pJson, "commit version", pState->committed) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "commit version", pState->committed) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "commit ID", pState->commitID) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "commit ID", pState->commitID) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "commit term", pState->commitTerm) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "commit term", pState->commitTerm) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "ttl expire time", pState->ttlExpireTime) < 0) return -1;
return 0; return 0;
} }
...@@ -586,6 +588,8 @@ static int vnodeDecodeState(const SJson *pJson, void *pObj) { ...@@ -586,6 +588,8 @@ static int vnodeDecodeState(const SJson *pJson, void *pObj) {
if (code < 0) return -1; if (code < 0) return -1;
tjsonGetNumberValue(pJson, "commit term", pState->commitTerm, code); tjsonGetNumberValue(pJson, "commit term", pState->commitTerm, code);
if (code < 0) return -1; if (code < 0) return -1;
tjsonGetNumberValue(pJson, "ttl expire time", pState->ttlExpireTime, code);
if (code < 0) return -1;
return 0; return 0;
} }
......
...@@ -60,6 +60,7 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, int32_t diskPrimary, STfs ...@@ -60,6 +60,7 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, int32_t diskPrimary, STfs
info.state.committed = -1; info.state.committed = -1;
info.state.applied = -1; info.state.applied = -1;
info.state.commitID = 0; info.state.commitID = 0;
info.state.ttlExpireTime = 0;
vInfo("vgId:%d, save config while create", info.config.vgId); vInfo("vgId:%d, save config while create", info.config.vgId);
if (vnodeSaveInfo(dir, &info) < 0 || vnodeCommitInfo(dir) < 0) { if (vnodeSaveInfo(dir, &info) < 0 || vnodeCommitInfo(dir) < 0) {
...@@ -364,6 +365,7 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC ...@@ -364,6 +365,7 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
pVnode->state.commitID = info.state.commitID; pVnode->state.commitID = info.state.commitID;
pVnode->state.applied = info.state.committed; pVnode->state.applied = info.state.committed;
pVnode->state.applyTerm = info.state.commitTerm; pVnode->state.applyTerm = info.state.commitTerm;
pVnode->state.ttlExpireTime = info.state.ttlExpireTime;
pVnode->pTfs = pTfs; pVnode->pTfs = pTfs;
pVnode->diskPrimary = diskPrimary; pVnode->diskPrimary = diskPrimary;
pVnode->msgCb = msgCb; pVnode->msgCb = msgCb;
......
...@@ -720,7 +720,9 @@ static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t ver, void *pReq, ...@@ -720,7 +720,9 @@ static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t ver, void *pReq,
} }
vDebug("vgId:%d, drop ttl table req will be processed, time:%" PRId32, pVnode->config.vgId, ttlReq.timestampSec); vDebug("vgId:%d, drop ttl table req will be processed, time:%" PRId32, pVnode->config.vgId, ttlReq.timestampSec);
code = metaTtlSetExpireTime(pVnode->pMeta, (int64_t)ttlReq.timestampSec * 1000); int64_t ttlExpireTimeMs = (int64_t)ttlReq.timestampSec * 1000;
atomic_store_64(&pVnode->state.ttlExpireTime, ttlExpireTimeMs);
code = metaTtlSetExpireTime(pVnode->pMeta, ttlExpireTimeMs);
if (code) goto end; if (code) goto end;
code = vnodeAsyncTtlDropTable(pVnode); code = vnodeAsyncTtlDropTable(pVnode);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册