提交 d5180b01 编写于 作者: S Shungang Li

simple impl for ttltask control

上级 98bbde34
...@@ -42,6 +42,8 @@ typedef struct SMetaCache SMetaCache; ...@@ -42,6 +42,8 @@ typedef struct SMetaCache SMetaCache;
int32_t metaRLock(SMeta* pMeta); int32_t metaRLock(SMeta* pMeta);
int32_t metaWLock(SMeta* pMeta); int32_t metaWLock(SMeta* pMeta);
int32_t metaULock(SMeta* pMeta); int32_t metaULock(SMeta* pMeta);
int32_t metaWaitTxnReadyAndWLock(SMeta* pMeta);
int32_t metaULockAndPostTxnReady(SMeta* pMeta);
// metaEntry ================== // metaEntry ==================
int metaEncodeEntry(SEncoder* pCoder, const SMetaEntry* pME); int metaEncodeEntry(SEncoder* pCoder, const SMetaEntry* pME);
...@@ -77,7 +79,7 @@ int32_t metaUidFilterCacheGet(SMeta* pMeta, uint64_t suid, const void* pKey, int ...@@ -77,7 +79,7 @@ int32_t metaUidFilterCacheGet(SMeta* pMeta, uint64_t suid, const void* pKey, int
struct SMeta { struct SMeta {
TdThreadRwlock lock; TdThreadRwlock lock;
tsem_t txnReady; // if false, we should not write journal tsem_t txnReady; // vnode-write: wait in 'metaCommit' and post in 'metaBegin'
char* path; char* path;
SVnode* pVnode; SVnode* pVnode;
......
...@@ -153,7 +153,7 @@ int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pR ...@@ -153,7 +153,7 @@ int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pR
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids, int64_t* tbUid); int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids, int64_t* tbUid);
int32_t metaTrimTables(SMeta* pMeta); int32_t metaTrimTables(SMeta* pMeta);
int32_t metaTtlSetExpireTime(SMeta* pMeta, int64_t timePointMs); int32_t metaTtlSetExpireTime(SMeta* pMeta, int64_t timePointMs);
int metaTtlDropTables(SMeta* pMeta, SArray* tbUids); int metaTtlDropTables(SMeta* pMeta, SArray* tbUids, bool* pShallAbort);
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp); int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);
int metaUpdateChangeTimeWithLock(SMeta* pMeta, tb_uid_t uid, int64_t changeTimeMs); int metaUpdateChangeTimeWithLock(SMeta* pMeta, tb_uid_t uid, int64_t changeTimeMs);
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock); SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock);
...@@ -415,7 +415,8 @@ struct SVnode { ...@@ -415,7 +415,8 @@ struct SVnode {
TdThreadMutex lock; TdThreadMutex lock;
bool blocked; bool blocked;
bool restored; bool restored;
bool hasTtlTask; bool ttlTaskProcessing;
bool ttlTaskShallAbort;
tsem_t syncSem; tsem_t syncSem;
int32_t blockSec; int32_t blockSec;
int64_t blockSeq; int64_t blockSeq;
......
...@@ -265,6 +265,24 @@ int32_t metaULock(SMeta *pMeta) { ...@@ -265,6 +265,24 @@ int32_t metaULock(SMeta *pMeta) {
return ret; return ret;
} }
int32_t metaWaitTxnReadyAndWLock(SMeta *pMeta) {
int32_t ret = 0;
tsem_wait(&pMeta->txnReady);
ret = metaWLock(pMeta);
return ret;
}
int32_t metaULockAndPostTxnReady(SMeta *pMeta) {
int32_t ret = 0;
ret = metaULock(pMeta);
tsem_post(&pMeta->txnReady);
return ret;
}
static void metaCleanup(SMeta **ppMeta) { static void metaCleanup(SMeta **ppMeta) {
SMeta *pMeta = *ppMeta; SMeta *pMeta = *ppMeta;
if (pMeta) { if (pMeta) {
......
...@@ -932,15 +932,13 @@ int metaTtlSetExpireTime(SMeta *pMeta, int64_t timePointMs) { ...@@ -932,15 +932,13 @@ int metaTtlSetExpireTime(SMeta *pMeta, int64_t timePointMs) {
return 0; return 0;
} }
int metaTtlDropTables(SMeta *pMeta, SArray *tbUids) { int metaTtlDropTables(SMeta *pMeta, SArray *tbUids, bool* pShallAbort) {
int ret = 0; int ret = 0;
int64_t startMs = taosGetTimestampMs(); int64_t startMs = taosGetTimestampMs();
tsem_wait(&pMeta->txnReady); metaWaitTxnReadyAndWLock(pMeta);
metaWLock(pMeta);
ttlMgrFlush(pMeta->pTtlMgr, pMeta->txn); ttlMgrFlush(pMeta->pTtlMgr, pMeta->txn);
metaULock(pMeta); metaULockAndPostTxnReady(pMeta);
tsem_post(&pMeta->txnReady);
metaRLock(pMeta); metaRLock(pMeta);
ret = ttlMgrFindExpired(pMeta->pTtlMgr, tbUids); ret = ttlMgrFindExpired(pMeta->pTtlMgr, tbUids);
...@@ -959,16 +957,15 @@ int metaTtlDropTables(SMeta *pMeta, SArray *tbUids) { ...@@ -959,16 +957,15 @@ int metaTtlDropTables(SMeta *pMeta, SArray *tbUids) {
for (int i = 0; i < TARRAY_SIZE(tbUids); ++i) { for (int i = 0; i < TARRAY_SIZE(tbUids); ++i) {
tb_uid_t uid = *(tb_uid_t *)taosArrayGet(tbUids, i); tb_uid_t uid = *(tb_uid_t *)taosArrayGet(tbUids, i);
tsem_wait(&pMeta->txnReady); if (*pShallAbort) break;
metaWLock(pMeta); metaWaitTxnReadyAndWLock(pMeta);
metaDropTableByUid(pMeta, uid, NULL); metaDropTableByUid(pMeta, uid, NULL);
metaULock(pMeta); metaULockAndPostTxnReady(pMeta);
tsem_post(&pMeta->txnReady);
taosUsleep(1); taosUsleep(1);
} }
int64_t endMs = taosGetTimestampMs(); int64_t endMs = taosGetTimestampMs();
metaInfo("ttl drop table finished, time consumed:%" PRId64 "ms", endMs - startMs); metaInfo("ttl drop table finished, time consumed:%" PRId64 "ms, isAbort:%d", endMs - startMs, *pShallAbort);
return 0; return 0;
} }
......
...@@ -362,7 +362,7 @@ static int32_t vnodeTtlTask(void *arg) { ...@@ -362,7 +362,7 @@ static int32_t vnodeTtlTask(void *arg) {
SVnode *pVnode = pInfo->pVnode; SVnode *pVnode = pInfo->pVnode;
SArray *tbUids = taosArrayInit(8, sizeof(int64_t)); SArray *tbUids = taosArrayInit(8, sizeof(int64_t));
code = metaTtlDropTables(pVnode->pMeta, tbUids); code = metaTtlDropTables(pVnode->pMeta, tbUids, &pVnode->ttlTaskShallAbort);
if (code) { if (code) {
vFatal("vgId:%d, meta failed to drop table by ttl since %s", TD_VID(pVnode), terrstr()); vFatal("vgId:%d, meta failed to drop table by ttl since %s", TD_VID(pVnode), terrstr());
goto _exit; goto _exit;
...@@ -373,7 +373,7 @@ static int32_t vnodeTtlTask(void *arg) { ...@@ -373,7 +373,7 @@ static int32_t vnodeTtlTask(void *arg) {
} }
_exit: _exit:
pVnode->hasTtlTask = false; pVnode->ttlTaskProcessing = false;
taosArrayDestroy(tbUids); taosArrayDestroy(tbUids);
taosMemoryFree(pInfo); taosMemoryFree(pInfo);
return code; return code;
......
...@@ -371,7 +371,8 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC ...@@ -371,7 +371,8 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
pVnode->msgCb = msgCb; pVnode->msgCb = msgCb;
taosThreadMutexInit(&pVnode->lock, NULL); taosThreadMutexInit(&pVnode->lock, NULL);
pVnode->blocked = false; pVnode->blocked = false;
pVnode->hasTtlTask = false; pVnode->ttlTaskProcessing = false;
pVnode->ttlTaskShallAbort = false;
tsem_init(&pVnode->syncSem, 0, 0); tsem_init(&pVnode->syncSem, 0, 0);
tsem_init(&(pVnode->canCommit), 0, 1); tsem_init(&(pVnode->canCommit), 0, 1);
...@@ -476,6 +477,11 @@ void vnodePostClose(SVnode *pVnode) { vnodeSyncPostClose(pVnode); } ...@@ -476,6 +477,11 @@ void vnodePostClose(SVnode *pVnode) { vnodeSyncPostClose(pVnode); }
void vnodeClose(SVnode *pVnode) { void vnodeClose(SVnode *pVnode) {
if (pVnode) { if (pVnode) {
pVnode->ttlTaskShallAbort = true;
while (pVnode->ttlTaskProcessing) {
taosMsleep(10);
}
tsem_wait(&pVnode->canCommit); tsem_wait(&pVnode->canCommit);
vnodeSyncClose(pVnode); vnodeSyncClose(pVnode);
vnodeQueryClose(pVnode); vnodeQueryClose(pVnode);
......
...@@ -725,11 +725,11 @@ static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t ver, void *pReq, ...@@ -725,11 +725,11 @@ static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t ver, void *pReq,
code = metaTtlSetExpireTime(pVnode->pMeta, ttlExpireTimeMs); code = metaTtlSetExpireTime(pVnode->pMeta, ttlExpireTimeMs);
if (code) goto end; if (code) goto end;
if (!pVnode->hasTtlTask) { if (!pVnode->ttlTaskProcessing) {
pVnode->hasTtlTask = true; pVnode->ttlTaskProcessing = true;
code = vnodeAsyncTtlDropTable(pVnode); code = vnodeAsyncTtlDropTable(pVnode);
if (code) { if (code) {
pVnode->hasTtlTask = false; pVnode->ttlTaskProcessing = false;
goto end; goto end;
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册