提交 b38c4a9d 编写于 作者: S Shungang Li

tmp for async ttl table drop

上级 a74ab0e7
......@@ -39,6 +39,8 @@ typedef struct STtlManger {
SHashObj* pDirtyUids; // hash<dirtyTuid, entryType>
TTB* pTtlIdx; // btree<{deleteTime, tuid}, ttl>
int64_t expireTimeMs; // ttl table expiration time
char* logPrefix;
int32_t flushThreshold; // max dirty entry number in memory. if -1, flush will not be triggered by write-ops
} STtlManger;
......@@ -96,7 +98,9 @@ int ttlMgrDeleteTtl(STtlManger* pTtlMgr, const STtlDelTtlCtx* pDelCtx);
int ttlMgrUpdateChangeTime(STtlManger* pTtlMgr, const STtlUpdCtimeCtx* pUpdCtimeCtx);
int ttlMgrFlush(STtlManger* pTtlMgr, TXN* pTxn);
int ttlMgrFindExpired(STtlManger* pTtlMgr, int64_t timePointMs, SArray* pTbUids);
int ttlMgrFindExpired(STtlManger* pTtlMgr, SArray* pTbUids);
int ttlMgrSetExpireTime(STtlManger* pTtlMgr, int64_t timePointMs);
#ifdef __cplusplus
}
......
......@@ -107,6 +107,7 @@ int32_t vnodeCommitInfo(const char* dir);
int32_t vnodeLoadInfo(const char* dir, SVnodeInfo* pInfo);
int32_t vnodeSyncCommit(SVnode* pVnode);
int32_t vnodeAsyncCommit(SVnode* pVnode);
int32_t vnodeAsyncTtlDropTable(SVnode* pVnode);
bool vnodeShouldRollback(SVnode* pVnode);
// vnodeSync.c
......
......@@ -77,6 +77,7 @@ typedef struct SRSmaSnapReader SRSmaSnapReader;
typedef struct SRSmaSnapWriter SRSmaSnapWriter;
typedef struct SSnapDataHdr SSnapDataHdr;
typedef struct SCommitInfo SCommitInfo;
typedef struct STtlInfo STtlInfo;
typedef struct SCompactInfo SCompactInfo;
typedef struct SQueryNode SQueryNode;
......@@ -151,7 +152,8 @@ int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp** pMetaRsp);
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids, int64_t* tbUid);
int32_t metaTrimTables(SMeta* pMeta);
int metaTtlDropTable(SMeta* pMeta, int64_t timePointMs, SArray* tbUids);
int32_t metaTtlSetExpireTime(SMeta* pMeta, int64_t timePointMs);
int metaTtlDropTable(SMeta* pMeta, SArray* tbUids);
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);
int metaUpdateChangeTimeWithLock(SMeta* pMeta, tb_uid_t uid, int64_t changeTimeMs);
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock);
......@@ -491,6 +493,10 @@ struct SCommitInfo {
TXN* txn;
};
struct STtlInfo {
SVnode* pVnode;
};
struct SCompactInfo {
SVnode* pVnode;
int32_t flag;
......
......@@ -927,7 +927,12 @@ end:
return code;
}
int metaTtlDropTable(SMeta *pMeta, int64_t timePointMs, SArray *tbUids) {
int metaTtlSetExpireTime(SMeta *pMeta, int64_t timePointMs) {
ttlMgrSetExpireTime(pMeta->pTtlMgr, timePointMs);
return 0;
}
int metaTtlDropTable(SMeta *pMeta, SArray *tbUids) {
int64_t startNs = taosGetTimestampNs();
int ret = ttlMgrFlush(pMeta->pTtlMgr, pMeta->txn);
......@@ -936,7 +941,7 @@ int metaTtlDropTable(SMeta *pMeta, int64_t timePointMs, SArray *tbUids) {
return ret;
}
ret = ttlMgrFindExpired(pMeta->pTtlMgr, timePointMs, tbUids);
ret = ttlMgrFindExpired(pMeta->pTtlMgr, tbUids);
if (ret != 0) {
metaError("ttl failed to find expired table, ret:%d", ret);
return ret;
......
......@@ -59,6 +59,7 @@ int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback, const char *lo
strcpy(logBuffer, logPrefix);
pTtlMgr->logPrefix = logBuffer;
pTtlMgr->flushThreshold = flushThreshold;
pTtlMgr->expireTimeMs = 0; // TODO(LSG)
ret = tdbTbOpen(ttlV1Tbname, TDB_VARIANT_LEN, TDB_VARIANT_LEN, ttlIdxKeyV1Cmpr, pEnv, &pTtlMgr->pTtlIdx, rollback);
if (ret < 0) {
......@@ -358,9 +359,14 @@ _out:
return ret;
}
int ttlMgrFindExpired(STtlManger *pTtlMgr, int64_t timePointMs, SArray *pTbUids) {
int ttlMgrFindExpired(STtlManger *pTtlMgr, SArray *pTbUids) {
ttlMgrRLock(pTtlMgr);
if (pTtlMgr->expireTimeMs == 0) {
metaError("%s, ttl expireTimeMs uninitialized, skip find expired", pTtlMgr->logPrefix);
return 0;
}
TBC *pCur;
int ret = tdbTbcOpen(pTtlMgr->pTtlIdx, &pCur, NULL);
if (ret < 0) {
......@@ -368,7 +374,7 @@ int ttlMgrFindExpired(STtlManger *pTtlMgr, int64_t timePointMs, SArray *pTbUids)
}
STtlIdxKeyV1 ttlKey = {0};
ttlKey.deleteTimeMs = timePointMs;
ttlKey.deleteTimeMs = pTtlMgr->expireTimeMs;
ttlKey.uid = INT64_MAX;
int c = 0;
tdbTbcMoveTo(pCur, &ttlKey, sizeof(ttlKey), &c);
......@@ -475,6 +481,20 @@ int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
return ret;
}
int ttlMgrSetExpireTime(STtlManger *pTtlMgr, int64_t timePointMs) {
ttlMgrWLock(pTtlMgr);
if (pTtlMgr->expireTimeMs > timePointMs) {
metaWarn("%s, ttl mgr new expireTimeMs:%" PRId64 " is smaller than old:%" PRId64 ". maybe restoring?",
pTtlMgr->logPrefix, timePointMs, pTtlMgr->expireTimeMs);
} else {
pTtlMgr->expireTimeMs = timePointMs;
}
ttlMgrULock(pTtlMgr);
return 0;
}
static int32_t ttlMgrRLock(STtlManger *pTtlMgr) {
int32_t ret = 0;
......
......@@ -353,6 +353,59 @@ static void vnodeReturnBufPool(SVnode *pVnode) {
taosThreadMutexUnlock(&pVnode->mutex);
}
static int32_t vnodeTtlTask(void *arg) {
int32_t code = 0;
STtlInfo *pInfo = (STtlInfo *)arg;
SVnode *pVnode = pInfo->pVnode;
SArray *tbUids = taosArrayInit(8, sizeof(int64_t));
code = metaTtlDropTable(pVnode->pMeta, tbUids);
if (code) {
vFatal("vgId:%d, meta failed to drop table by ttl since %s", TD_VID(pVnode), terrstr());
goto _exit;
}
// if (taosArrayGetSize(tbUids) > 0) {
// tqUpdateTbUidList(pVnode->pTq, tbUids, false);
// }
// TODO(LSG)
//vnodeAsyncRentention(pVnode, ttlReq.timestampSec);
_exit:
// end commit
taosMemoryFree(pInfo);
return code;
}
int vnodeAsyncTtlDropTable(SVnode *pVnode) {
int32_t code = 0;
STtlInfo *pInfo = (STtlInfo *)taosMemoryCalloc(1, sizeof(*pInfo));
if (NULL == pInfo) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
// schedule the task
code = vnodeScheduleTask(vnodeTtlTask, pInfo);
_exit:
if (code) {
if (NULL != pInfo) {
taosMemoryFree(pInfo);
}
vError("vgId:%d, %s failed since %s, commit id:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code),
pVnode->state.commitID);
} else {
vInfo("vgId:%d, vnode async commit done, commitId:%" PRId64 " term:%" PRId64 " applied:%" PRId64, TD_VID(pVnode),
pVnode->state.commitID, pVnode->state.applyTerm, pVnode->state.applied);
}
return code;
}
static int32_t vnodeCommitTask(void *arg) {
int32_t code = 0;
......
......@@ -710,29 +710,23 @@ _exit:
}
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
SArray *tbUids = taosArrayInit(8, sizeof(int64_t));
if (tbUids == NULL) return TSDB_CODE_OUT_OF_MEMORY;
int32_t code = 0;
SVDropTtlTableReq ttlReq = {0};
// decode
if (tDeserializeSVDropTtlTableReq(pReq, len, &ttlReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
code = TSDB_CODE_INVALID_MSG;
goto end;
}
vDebug("vgId:%d, drop ttl table req will be processed, time:%" PRId32, pVnode->config.vgId, ttlReq.timestampSec);
int32_t ret = metaTtlDropTable(pVnode->pMeta, (int64_t)ttlReq.timestampSec * 1000, tbUids);
if (ret != 0) {
goto end;
}
if (taosArrayGetSize(tbUids) > 0) {
tqUpdateTbUidList(pVnode->pTq, tbUids, false);
}
code = metaTtlSetExpireTime(pVnode->pMeta, (int64_t)ttlReq.timestampSec * 1000);
if (code) goto end;
vnodeDoRetention(pVnode, ttlReq.timestampSec);
code = vnodeAsyncTtlDropTable(pVnode);
end:
taosArrayDestroy(tbUids);
return ret;
return code;
}
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册