提交 d87a2b18 编写于 作者: S Shungang Li

flag 'hasTtlTask' for single task

上级 474aeb44
...@@ -77,7 +77,7 @@ int32_t metaUidFilterCacheGet(SMeta* pMeta, uint64_t suid, const void* pKey, int ...@@ -77,7 +77,7 @@ int32_t metaUidFilterCacheGet(SMeta* pMeta, uint64_t suid, const void* pKey, int
struct SMeta { struct SMeta {
TdThreadRwlock lock; TdThreadRwlock lock;
tsem_t txnReady; // if false we should not write journal tsem_t txnReady; // if false, we should not write journal
char* path; char* path;
SVnode* pVnode; SVnode* pVnode;
......
...@@ -31,7 +31,7 @@ typedef enum DirtyEntryType { ...@@ -31,7 +31,7 @@ typedef enum DirtyEntryType {
} DirtyEntryType; } DirtyEntryType;
typedef struct STtlManger { typedef struct STtlManger {
TdThreadRwlock lock; //TdThreadRwlock lock; // use meta lock, no self lock needed
TTB* pOldTtlIdx; // btree<{deleteTime, tuid}, NULL> TTB* pOldTtlIdx; // btree<{deleteTime, tuid}, NULL>
......
...@@ -153,7 +153,7 @@ int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pR ...@@ -153,7 +153,7 @@ int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pR
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids, int64_t* tbUid); int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids, int64_t* tbUid);
int32_t metaTrimTables(SMeta* pMeta); int32_t metaTrimTables(SMeta* pMeta);
int32_t metaTtlSetExpireTime(SMeta* pMeta, int64_t timePointMs); int32_t metaTtlSetExpireTime(SMeta* pMeta, int64_t timePointMs);
int metaTtlDropTable(SMeta* pMeta, SArray* tbUids); int metaTtlDropTables(SMeta* pMeta, SArray* tbUids);
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp); int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);
int metaUpdateChangeTimeWithLock(SMeta* pMeta, tb_uid_t uid, int64_t changeTimeMs); int metaUpdateChangeTimeWithLock(SMeta* pMeta, tb_uid_t uid, int64_t changeTimeMs);
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock); SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock);
...@@ -415,6 +415,7 @@ struct SVnode { ...@@ -415,6 +415,7 @@ struct SVnode {
TdThreadMutex lock; TdThreadMutex lock;
bool blocked; bool blocked;
bool restored; bool restored;
bool hasTtlTask;
tsem_t syncSem; tsem_t syncSem;
int32_t blockSec; int32_t blockSec;
int64_t blockSeq; int64_t blockSeq;
......
...@@ -932,14 +932,19 @@ int metaTtlSetExpireTime(SMeta *pMeta, int64_t timePointMs) { ...@@ -932,14 +932,19 @@ int metaTtlSetExpireTime(SMeta *pMeta, int64_t timePointMs) {
return 0; return 0;
} }
int metaTtlDropTable(SMeta *pMeta, SArray *tbUids) { int metaTtlDropTables(SMeta *pMeta, SArray *tbUids) {
int ret = 0;
int64_t startMs = taosGetTimestampMs(); int64_t startMs = taosGetTimestampMs();
tsem_wait(&pMeta->txnReady); tsem_wait(&pMeta->txnReady);
metaWLock(pMeta);
ttlMgrFlush(pMeta->pTtlMgr, pMeta->txn); ttlMgrFlush(pMeta->pTtlMgr, pMeta->txn);
metaULock(pMeta);
tsem_post(&pMeta->txnReady); tsem_post(&pMeta->txnReady);
int ret = ttlMgrFindExpired(pMeta->pTtlMgr, tbUids); metaRLock(pMeta);
ret = ttlMgrFindExpired(pMeta->pTtlMgr, tbUids);
metaULock(pMeta);
if (ret != 0) { if (ret != 0) {
metaError("ttl failed to find expired table, ret:%d", ret); metaError("ttl failed to find expired table, ret:%d", ret);
return ret; return ret;
...@@ -959,6 +964,7 @@ int metaTtlDropTable(SMeta *pMeta, SArray *tbUids) { ...@@ -959,6 +964,7 @@ int metaTtlDropTable(SMeta *pMeta, SArray *tbUids) {
metaDropTableByUid(pMeta, uid, NULL); metaDropTableByUid(pMeta, uid, NULL);
metaULock(pMeta); metaULock(pMeta);
tsem_post(&pMeta->txnReady); tsem_post(&pMeta->txnReady);
taosUsleep(1);
} }
int64_t endMs = taosGetTimestampMs(); int64_t endMs = taosGetTimestampMs();
......
...@@ -73,7 +73,7 @@ int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback, const char *lo ...@@ -73,7 +73,7 @@ int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback, const char *lo
pTtlMgr->pTtlCache = taosHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); pTtlMgr->pTtlCache = taosHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
pTtlMgr->pDirtyUids = taosHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); pTtlMgr->pDirtyUids = taosHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
taosThreadRwlockInit(&pTtlMgr->lock, NULL); //taosThreadRwlockInit(&pTtlMgr->lock, NULL);
ret = ttlMgrFillCache(pTtlMgr); ret = ttlMgrFillCache(pTtlMgr);
if (ret < 0) { if (ret < 0) {
...@@ -150,7 +150,7 @@ static void ttlMgrCleanup(STtlManger *pTtlMgr) { ...@@ -150,7 +150,7 @@ static void ttlMgrCleanup(STtlManger *pTtlMgr) {
taosHashCleanup(pTtlMgr->pTtlCache); taosHashCleanup(pTtlMgr->pTtlCache);
taosHashCleanup(pTtlMgr->pDirtyUids); taosHashCleanup(pTtlMgr->pDirtyUids);
tdbTbClose(pTtlMgr->pTtlIdx); tdbTbClose(pTtlMgr->pTtlIdx);
taosThreadRwlockDestroy(&pTtlMgr->lock); //taosThreadRwlockDestroy(&pTtlMgr->lock);
taosMemoryFree(pTtlMgr); taosMemoryFree(pTtlMgr);
} }
...@@ -508,9 +508,7 @@ int ttlMgrSetExpireTime(STtlManger *pTtlMgr, int64_t timePointMs) { ...@@ -508,9 +508,7 @@ int ttlMgrSetExpireTime(STtlManger *pTtlMgr, int64_t timePointMs) {
static int32_t ttlMgrRLock(STtlManger *pTtlMgr) { static int32_t ttlMgrRLock(STtlManger *pTtlMgr) {
int32_t ret = 0; int32_t ret = 0;
metaTrace("%s, ttl mgr rlock %p", pTtlMgr->logPrefix, &pTtlMgr->lock); //ret = taosThreadRwlockRdlock(&pTtlMgr->lock);
ret = taosThreadRwlockRdlock(&pTtlMgr->lock);
return ret; return ret;
} }
...@@ -518,9 +516,7 @@ static int32_t ttlMgrRLock(STtlManger *pTtlMgr) { ...@@ -518,9 +516,7 @@ static int32_t ttlMgrRLock(STtlManger *pTtlMgr) {
static int32_t ttlMgrWLock(STtlManger *pTtlMgr) { static int32_t ttlMgrWLock(STtlManger *pTtlMgr) {
int32_t ret = 0; int32_t ret = 0;
metaTrace("%s, ttl mgr wlock %p", pTtlMgr->logPrefix, &pTtlMgr->lock); //ret = taosThreadRwlockWrlock(&pTtlMgr->lock);
ret = taosThreadRwlockWrlock(&pTtlMgr->lock);
return ret; return ret;
} }
...@@ -528,9 +524,7 @@ static int32_t ttlMgrWLock(STtlManger *pTtlMgr) { ...@@ -528,9 +524,7 @@ static int32_t ttlMgrWLock(STtlManger *pTtlMgr) {
static int32_t ttlMgrULock(STtlManger *pTtlMgr) { static int32_t ttlMgrULock(STtlManger *pTtlMgr) {
int32_t ret = 0; int32_t ret = 0;
metaTrace("%s, ttl mgr ulock %p", pTtlMgr->logPrefix, &pTtlMgr->lock); //ret = taosThreadRwlockUnlock(&pTtlMgr->lock);
ret = taosThreadRwlockUnlock(&pTtlMgr->lock);
return ret; return ret;
} }
...@@ -362,7 +362,7 @@ static int32_t vnodeTtlTask(void *arg) { ...@@ -362,7 +362,7 @@ static int32_t vnodeTtlTask(void *arg) {
SVnode *pVnode = pInfo->pVnode; SVnode *pVnode = pInfo->pVnode;
SArray *tbUids = taosArrayInit(8, sizeof(int64_t)); SArray *tbUids = taosArrayInit(8, sizeof(int64_t));
code = metaTtlDropTable(pVnode->pMeta, tbUids); code = metaTtlDropTables(pVnode->pMeta, tbUids);
if (code) { if (code) {
vFatal("vgId:%d, meta failed to drop table by ttl since %s", TD_VID(pVnode), terrstr()); vFatal("vgId:%d, meta failed to drop table by ttl since %s", TD_VID(pVnode), terrstr());
goto _exit; goto _exit;
...@@ -373,7 +373,7 @@ static int32_t vnodeTtlTask(void *arg) { ...@@ -373,7 +373,7 @@ static int32_t vnodeTtlTask(void *arg) {
} }
_exit: _exit:
// end commit pVnode->hasTtlTask = false;
taosArrayDestroy(tbUids); taosArrayDestroy(tbUids);
taosMemoryFree(pInfo); taosMemoryFree(pInfo);
return code; return code;
......
...@@ -371,6 +371,7 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC ...@@ -371,6 +371,7 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
pVnode->msgCb = msgCb; pVnode->msgCb = msgCb;
taosThreadMutexInit(&pVnode->lock, NULL); taosThreadMutexInit(&pVnode->lock, NULL);
pVnode->blocked = false; pVnode->blocked = false;
pVnode->hasTtlTask = false;
tsem_init(&pVnode->syncSem, 0, 0); tsem_init(&pVnode->syncSem, 0, 0);
tsem_init(&(pVnode->canCommit), 0, 1); tsem_init(&(pVnode->canCommit), 0, 1);
......
...@@ -725,8 +725,14 @@ static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t ver, void *pReq, ...@@ -725,8 +725,14 @@ static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t ver, void *pReq,
code = metaTtlSetExpireTime(pVnode->pMeta, ttlExpireTimeMs); code = metaTtlSetExpireTime(pVnode->pMeta, ttlExpireTimeMs);
if (code) goto end; if (code) goto end;
code = vnodeAsyncTtlDropTable(pVnode); if (!pVnode->hasTtlTask) {
if (code) goto end; pVnode->hasTtlTask = true;
code = vnodeAsyncTtlDropTable(pVnode);
if (code) {
pVnode->hasTtlTask = false;
goto end;
}
}
code = vnodeDoRetention(pVnode, ttlReq.timestampSec); code = vnodeDoRetention(pVnode, ttlReq.timestampSec);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册