提交 a31e0541 编写于 作者: S Shungang Li

fix: ttlmgr convert in metaUpgrade

上级 a1a2b833
...@@ -79,16 +79,18 @@ typedef struct { ...@@ -79,16 +79,18 @@ typedef struct {
TXN* pTxn; TXN* pTxn;
} STtlDelTtlCtx; } STtlDelTtlCtx;
int ttlMgrOpen(STtlManger** ppTtlMgr, TDB* pEnv, int8_t rollback); int ttlMgrOpen(STtlManger** ppTtlMgr, TDB* pEnv, int8_t rollback);
int ttlMgrClose(STtlManger* pTtlMgr); void ttlMgrClose(STtlManger* pTtlMgr);
int ttlMgrPostOpen(STtlManger* pTtlMgr, void* pMeta); int ttlMgrPostOpen(STtlManger* pTtlMgr, void* pMeta);
int ttlMgrConvert(TTB* pOldTtlIdx, TTB* pNewTtlIdx, void* pMeta); bool ttlMgrNeedUpgrade(TDB* pEnv);
int ttlMgrFlush(STtlManger* pTtlMgr, TXN* pTxn); int ttlMgrUpgrade(STtlManger* pTtlMgr, void* pMeta);
int ttlMgrInsertTtl(STtlManger* pTtlMgr, const STtlUpdTtlCtx* pUpdCtx); int ttlMgrInsertTtl(STtlManger* pTtlMgr, const STtlUpdTtlCtx* pUpdCtx);
int ttlMgrDeleteTtl(STtlManger* pTtlMgr, const STtlDelTtlCtx* pDelCtx); int ttlMgrDeleteTtl(STtlManger* pTtlMgr, const STtlDelTtlCtx* pDelCtx);
int ttlMgrUpdateChangeTime(STtlManger* pTtlMgr, const STtlUpdCtimeCtx* pUpdCtimeCtx); int ttlMgrUpdateChangeTime(STtlManger* pTtlMgr, const STtlUpdCtimeCtx* pUpdCtimeCtx);
int ttlMgrFlush(STtlManger* pTtlMgr, TXN* pTxn);
int ttlMgrFindExpired(STtlManger* pTtlMgr, int64_t timePointMs, SArray* pTbUids); int ttlMgrFindExpired(STtlManger* pTtlMgr, int64_t timePointMs, SArray* pTbUids);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -136,7 +136,7 @@ typedef struct STbUidStore STbUidStore; ...@@ -136,7 +136,7 @@ typedef struct STbUidStore STbUidStore;
#define META_BEGIN_HEAP_NIL 2 #define META_BEGIN_HEAP_NIL 2
int metaOpen(SVnode* pVnode, SMeta** ppMeta, int8_t rollback); int metaOpen(SVnode* pVnode, SMeta** ppMeta, int8_t rollback);
int metaPostOpen(SVnode* pVnode, SMeta** ppMeta); // for operations depend on "meta txn" int metaUpgrade(SVnode* pVnode, SMeta** ppMeta);
int metaClose(SMeta** pMeta); int metaClose(SMeta** pMeta);
int metaBegin(SMeta* pMeta, int8_t fromSys); int metaBegin(SMeta* pMeta, int8_t fromSys);
TXN* metaGetTxn(SMeta* pMeta); TXN* metaGetTxn(SMeta* pMeta);
......
...@@ -186,18 +186,35 @@ _err: ...@@ -186,18 +186,35 @@ _err:
return -1; return -1;
} }
int metaPostOpen(SVnode *pVnode, SMeta **ppMeta) { int metaUpgrade(SVnode *pVnode, SMeta **ppMeta) {
int code = TSDB_CODE_SUCCESS;
SMeta *pMeta = *ppMeta; SMeta *pMeta = *ppMeta;
if (ttlMgrPostOpen(pMeta->pTtlMgr, pMeta) < 0) {
metaError("vgId:%d, failed to post open meta ttl since %s", TD_VID(pVnode), tstrerror(terrno)); if (ttlMgrNeedUpgrade(pMeta->pEnv)) {
goto _err; code = metaBegin(pMeta, META_BEGIN_HEAP_OS);
if (code < 0) {
metaError("vgId:%d, failed to upgrade meta, meta begin failed since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
code = ttlMgrUpgrade(pMeta->pTtlMgr, pMeta);
if (code < 0) {
metaError("vgId:%d, failed to upgrade meta ttl since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
code = metaCommit(pMeta, pMeta->txn);
if (code < 0) {
metaError("vgId:%d, failed to upgrade meta ttl, meta commit failed since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
} }
return 0; return TSDB_CODE_SUCCESS;
_err: _err:
metaCleanup(ppMeta); metaCleanup(ppMeta);
return -1; return code;
} }
int metaClose(SMeta **ppMeta) { int metaClose(SMeta **ppMeta) {
......
...@@ -21,6 +21,10 @@ typedef struct { ...@@ -21,6 +21,10 @@ typedef struct {
SMeta *pMeta; SMeta *pMeta;
} SConvertData; } SConvertData;
static void ttlMgrCleanup(STtlManger *pTtlMgr);
static int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta);
static void ttlMgrBuildKey(STtlIdxKeyV1 *pTtlKey, int64_t ttlDays, int64_t changeTimeMs, tb_uid_t uid); static void ttlMgrBuildKey(STtlIdxKeyV1 *pTtlKey, int64_t ttlDays, int64_t changeTimeMs, tb_uid_t uid);
static int ttlIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); static int ttlIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int ttlIdxKeyV1Cmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); static int ttlIdxKeyV1Cmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
...@@ -36,27 +40,17 @@ const char *ttlTbname = "ttl.idx"; ...@@ -36,27 +40,17 @@ const char *ttlTbname = "ttl.idx";
const char *ttlV1Tbname = "ttlv1.idx"; const char *ttlV1Tbname = "ttlv1.idx";
int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback) { int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback) {
int ret; int ret = TSDB_CODE_SUCCESS;
int64_t startNs = taosGetTimestampNs();
*ppTtlMgr = NULL; *ppTtlMgr = NULL;
STtlManger *pTtlMgr = (STtlManger *)tdbOsCalloc(1, sizeof(*pTtlMgr)); STtlManger *pTtlMgr = (STtlManger *)tdbOsCalloc(1, sizeof(*pTtlMgr));
if (pTtlMgr == NULL) { if (pTtlMgr == NULL) return TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (tdbTbExist(ttlTbname, pEnv)) {
ret = tdbTbOpen(ttlTbname, sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, pEnv, &pTtlMgr->pOldTtlIdx, rollback);
if (ret < 0) {
metaError("failed to open %s index since %s", ttlTbname, tstrerror(terrno));
return ret;
}
}
ret = tdbTbOpen(ttlV1Tbname, TDB_VARIANT_LEN, TDB_VARIANT_LEN, ttlIdxKeyV1Cmpr, pEnv, &pTtlMgr->pTtlIdx, rollback); ret = tdbTbOpen(ttlV1Tbname, TDB_VARIANT_LEN, TDB_VARIANT_LEN, ttlIdxKeyV1Cmpr, pEnv, &pTtlMgr->pTtlIdx, rollback);
if (ret < 0) { if (ret < 0) {
metaError("failed to open %s since %s", ttlV1Tbname, tstrerror(terrno)); metaError("failed to open %s since %s", ttlV1Tbname, tstrerror(terrno));
tdbOsFree(pTtlMgr); tdbOsFree(pTtlMgr);
return ret; return ret;
} }
...@@ -66,42 +60,57 @@ int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback) { ...@@ -66,42 +60,57 @@ int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback) {
taosThreadRwlockInit(&pTtlMgr->lock, NULL); taosThreadRwlockInit(&pTtlMgr->lock, NULL);
ret = ttlMgrFillCache(pTtlMgr);
if (ret < 0) {
metaError("failed to fill hash since %s", tstrerror(terrno));
ttlMgrCleanup(pTtlMgr);
return ret;
}
int64_t endNs = taosGetTimestampNs();
metaInfo("ttl mgr open end, hash size: %d, time consumed: %" PRId64 " ns", taosHashGetSize(pTtlMgr->pTtlCache),
endNs - startNs);
*ppTtlMgr = pTtlMgr; *ppTtlMgr = pTtlMgr;
return 0; return TSDB_CODE_SUCCESS;
} }
int ttlMgrClose(STtlManger *pTtlMgr) { void ttlMgrClose(STtlManger *pTtlMgr) { ttlMgrCleanup(pTtlMgr); }
taosHashCleanup(pTtlMgr->pTtlCache);
taosHashCleanup(pTtlMgr->pDirtyUids); bool ttlMgrNeedUpgrade(TDB *pEnv) {
tdbTbClose(pTtlMgr->pTtlIdx); bool needUpgrade = tdbTbExist(ttlTbname, pEnv);
taosThreadRwlockDestroy(&pTtlMgr->lock); if (needUpgrade) {
tdbOsFree(pTtlMgr); metaInfo("find ttl idx in old version , will convert");
return 0; }
return needUpgrade;
} }
int ttlMgrPostOpen(STtlManger *pTtlMgr, void *pMeta) { int ttlMgrUpgrade(STtlManger *pTtlMgr, void *pMeta) {
metaInfo("ttl mgr start post open"); SMeta *meta = (SMeta *)pMeta;
int ret; int ret = TSDB_CODE_SUCCESS;
int64_t startNs = taosGetTimestampNs(); if (!tdbTbExist(ttlTbname, meta->pEnv)) return TSDB_CODE_SUCCESS;
SMeta *meta = (SMeta *)pMeta; metaInfo("ttl mgr start upgrade");
if (pTtlMgr->pOldTtlIdx) { int64_t startNs = taosGetTimestampNs();
ret = ttlMgrConvert(pTtlMgr->pOldTtlIdx, pTtlMgr->pTtlIdx, pMeta);
if (ret < 0) {
metaError("failed to convert ttl index since %s", tstrerror(terrno));
goto _out;
}
ret = tdbTbDropByName(ttlTbname, meta->pEnv, meta->txn); ret = tdbTbOpen(ttlTbname, sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, meta->pEnv, &pTtlMgr->pOldTtlIdx, 0);
if (ret < 0) { if (ret < 0) {
metaError("failed to drop old ttl index since %s", tstrerror(terrno)); metaError("failed to open %s index since %s", ttlTbname, tstrerror(terrno));
goto _out; goto _out;
} }
ret = ttlMgrConvert(pTtlMgr->pOldTtlIdx, pTtlMgr->pTtlIdx, pMeta);
if (ret < 0) {
metaError("failed to convert ttl index since %s", tstrerror(terrno));
goto _out;
}
tdbTbClose(pTtlMgr->pOldTtlIdx); ret = tdbTbDropByName(ttlTbname, meta->pEnv, meta->txn);
pTtlMgr->pOldTtlIdx = NULL; if (ret < 0) {
metaError("failed to drop old ttl index since %s", tstrerror(terrno));
goto _out;
} }
ret = ttlMgrFillCache(pTtlMgr); ret = ttlMgrFillCache(pTtlMgr);
...@@ -111,13 +120,23 @@ int ttlMgrPostOpen(STtlManger *pTtlMgr, void *pMeta) { ...@@ -111,13 +120,23 @@ int ttlMgrPostOpen(STtlManger *pTtlMgr, void *pMeta) {
} }
int64_t endNs = taosGetTimestampNs(); int64_t endNs = taosGetTimestampNs();
metaInfo("ttl mgr upgrade end, hash size: %d, time consumed: %" PRId64 " ns", taosHashGetSize(pTtlMgr->pTtlCache),
metaInfo("ttl mgr post open end, hash size: %d, time consumed: %" PRId64 " ns", taosHashGetSize(pTtlMgr->pTtlCache),
endNs - startNs); endNs - startNs);
_out: _out:
tdbTbClose(pTtlMgr->pOldTtlIdx);
pTtlMgr->pOldTtlIdx = NULL;
return ret; return ret;
} }
static void ttlMgrCleanup(STtlManger *pTtlMgr) {
taosHashCleanup(pTtlMgr->pTtlCache);
taosHashCleanup(pTtlMgr->pDirtyUids);
tdbTbClose(pTtlMgr->pTtlIdx);
taosThreadRwlockDestroy(&pTtlMgr->lock);
tdbOsFree(pTtlMgr);
}
static void ttlMgrBuildKey(STtlIdxKeyV1 *pTtlKey, int64_t ttlDays, int64_t changeTimeMs, tb_uid_t uid) { static void ttlMgrBuildKey(STtlIdxKeyV1 *pTtlKey, int64_t ttlDays, int64_t changeTimeMs, tb_uid_t uid) {
if (ttlDays <= 0) return; if (ttlDays <= 0) return;
...@@ -205,7 +224,7 @@ _out: ...@@ -205,7 +224,7 @@ _out:
return ret; return ret;
} }
int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta) { static int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta) {
SMeta *meta = pMeta; SMeta *meta = pMeta;
metaInfo("ttlMgr convert ttl start."); metaInfo("ttlMgr convert ttl start.");
......
...@@ -371,6 +371,10 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { ...@@ -371,6 +371,10 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
goto _err; goto _err;
} }
if (metaUpgrade(pVnode, &pVnode->pMeta) < 0) {
vError("vgId:%d, failed to upgrade meta since %s", TD_VID(pVnode), tstrerror(terrno));
}
// open tsdb // open tsdb
if (!VND_IS_RSMA(pVnode) && tsdbOpen(pVnode, &VND_TSDB(pVnode), VNODE_TSDB_DIR, NULL, rollback) < 0) { if (!VND_IS_RSMA(pVnode) && tsdbOpen(pVnode, &VND_TSDB(pVnode), VNODE_TSDB_DIR, NULL, rollback) < 0) {
vError("vgId:%d, failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d, failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
...@@ -416,11 +420,6 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { ...@@ -416,11 +420,6 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
goto _err; goto _err;
} }
if (metaPostOpen(pVnode, &pVnode->pMeta) < 0) {
vError("vgId:%d, failed to post open vnode meta since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// open sync // open sync
if (vnodeSyncOpen(pVnode, dir)) { if (vnodeSyncOpen(pVnode, dir)) {
vError("vgId:%d, failed to open sync since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d, failed to open sync since %s", TD_VID(pVnode), tstrerror(terrno));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册