From 77c453e5874e2e24d149cf78a5ffb4275b784bd3 Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Thu, 17 Aug 2023 18:47:00 +0800 Subject: [PATCH] committer and writer mutual exclusion --- source/dnode/vnode/src/inc/meta.h | 4 ++- source/dnode/vnode/src/meta/metaCommit.c | 2 ++ source/dnode/vnode/src/meta/metaOpen.c | 23 ++++++++++++- source/dnode/vnode/src/meta/metaQuery.c | 4 +-- source/dnode/vnode/src/meta/metaSma.c | 2 +- source/dnode/vnode/src/meta/metaTable.c | 43 +++++++++++++----------- 6 files changed, 54 insertions(+), 24 deletions(-) diff --git a/source/dnode/vnode/src/inc/meta.h b/source/dnode/vnode/src/inc/meta.h index 9ef61a21d8..3b61d8ae5c 100644 --- a/source/dnode/vnode/src/inc/meta.h +++ b/source/dnode/vnode/src/inc/meta.h @@ -41,6 +41,7 @@ typedef struct SMetaCache SMetaCache; // metaOpen ================== int32_t metaRLock(SMeta* pMeta); int32_t metaWLock(SMeta* pMeta); +int32_t metaCheckTtlTaskAndWLock(SMeta* pMeta); int32_t metaULock(SMeta* pMeta); int32_t metaWaitTxnReadyAndWLock(SMeta* pMeta); int32_t metaULockAndPostTxnReady(SMeta* pMeta); @@ -79,7 +80,8 @@ int32_t metaUidFilterCacheGet(SMeta* pMeta, uint64_t suid, const void* pKey, int struct SMeta { TdThreadRwlock lock; - tsem_t txnReady; // vnode-write: wait in 'metaCommit' and post in 'metaBegin' + tsem_t txnReady; // vnode-write: wait in 'metaCommit' and post in 'metaBegin' + tsem_t writerWaiting; char* path; SVnode* pVnode; diff --git a/source/dnode/vnode/src/meta/metaCommit.c b/source/dnode/vnode/src/meta/metaCommit.c index d543b3d727..2a56c323e6 100644 --- a/source/dnode/vnode/src/meta/metaCommit.c +++ b/source/dnode/vnode/src/meta/metaCommit.c @@ -48,7 +48,9 @@ int metaBegin(SMeta *pMeta, int8_t heap) { // commit the meta txn TXN *metaGetTxn(SMeta *pMeta) { return pMeta->txn; } int metaCommit(SMeta *pMeta, TXN *txn) { + tsem_wait(&pMeta->writerWaiting); tsem_wait(&pMeta->txnReady); + tsem_post(&pMeta->writerWaiting); ttlMgrFlush(pMeta->pTtlMgr, pMeta->txn); return tdbCommit(pMeta->pEnv, txn); diff --git a/source/dnode/vnode/src/meta/metaOpen.c b/source/dnode/vnode/src/meta/metaOpen.c index c15d34f9d8..fee50e8de6 100644 --- a/source/dnode/vnode/src/meta/metaOpen.c +++ b/source/dnode/vnode/src/meta/metaOpen.c @@ -52,6 +52,7 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) { metaInitLock(pMeta); tsem_init(&(pMeta->txnReady), 0, 0); + tsem_init(&(pMeta->writerWaiting), 0, 1); pMeta->path = (char *)&pMeta[1]; strcpy(pMeta->path, path); @@ -224,7 +225,7 @@ int metaClose(SMeta **ppMeta) { } int metaAlterCache(SMeta *pMeta, int32_t nPage) { - metaWLock(pMeta); + metaCheckTtlTaskAndWLock(pMeta); if (tdbAlter(pMeta->pEnv, nPage) < 0) { metaULock(pMeta); @@ -255,6 +256,23 @@ int32_t metaWLock(SMeta *pMeta) { return ret; } +int32_t metaCheckTtlTaskAndWLock(SMeta *pMeta) { + int32_t ret = 0; + + bool needPost = false; + if (pMeta->pVnode->ttlTaskProcessing) { + tsem_wait(&pMeta->writerWaiting); + needPost = true; + } + + ret = metaWLock(pMeta); + if (needPost) { + tsem_post(&pMeta->writerWaiting); + } + + return ret; +} + int32_t metaULock(SMeta *pMeta) { int32_t ret = 0; @@ -268,6 +286,9 @@ int32_t metaULock(SMeta *pMeta) { int32_t metaWaitTxnReadyAndWLock(SMeta *pMeta) { int32_t ret = 0; + tsem_wait(&pMeta->writerWaiting); + tsem_post(&pMeta->writerWaiting); + tsem_wait(&pMeta->txnReady); ret = metaWLock(pMeta); diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index c26bb45c2b..c183608210 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -1465,7 +1465,7 @@ int32_t metaGetInfo(SMeta *pMeta, int64_t uid, SMetaInfo *pInfo, SMetaReader *pR } } // upsert the cache - metaWLock(pMeta); + metaCheckTtlTaskAndWLock(pMeta); metaCacheUpsert(pMeta, pInfo); metaULock(pMeta); @@ -1504,7 +1504,7 @@ int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t *numOfTables) { state.ctbNum = ctbNum; // upsert the cache - metaWLock(pVnodeObj->pMeta); + metaCheckTtlTaskAndWLock(pVnodeObj->pMeta); metaStatsCacheUpsert(pVnodeObj->pMeta, &state); metaULock(pVnodeObj->pMeta); diff --git a/source/dnode/vnode/src/meta/metaSma.c b/source/dnode/vnode/src/meta/metaSma.c index 91704f5c7a..dcb4acb7f4 100644 --- a/source/dnode/vnode/src/meta/metaSma.c +++ b/source/dnode/vnode/src/meta/metaSma.c @@ -147,7 +147,7 @@ static int metaUpdateSmaIdx(SMeta *pMeta, const SMetaEntry *pME) { } static int metaHandleSmaEntry(SMeta *pMeta, const SMetaEntry *pME) { - metaWLock(pMeta); + metaCheckTtlTaskAndWLock(pMeta); // save to table.db if (metaSaveSmaToDB(pMeta, pME) < 0) goto _err; diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 333e3e8a55..8f11245d72 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -269,7 +269,7 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq, SArray *tb rc = tdbTbcMoveTo(pCtbIdxc, &(SCtbIdxKey){.suid = pReq->suid, .uid = INT64_MIN}, sizeof(SCtbIdxKey), &c); if (rc < 0) { tdbTbcClose(pCtbIdxc); - metaWLock(pMeta); + metaCheckTtlTaskAndWLock(pMeta); goto _drop_super_table; } @@ -288,7 +288,7 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq, SArray *tb tdbTbcClose(pCtbIdxc); - metaWLock(pMeta); + metaCheckTtlTaskAndWLock(pMeta); for (int32_t iChild = 0; iChild < taosArrayGetSize(tbUidList); iChild++) { tb_uid_t uid = *(tb_uid_t *)taosArrayGet(tbUidList, iChild); @@ -376,7 +376,7 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { nStbEntry.stbEntry.schemaRow = pReq->schemaRow; nStbEntry.stbEntry.schemaTag = pReq->schemaTag; - metaWLock(pMeta); + metaCheckTtlTaskAndWLock(pMeta); // compare two entry if (oStbEntry.stbEntry.schemaRow.version != pReq->schemaRow.version) { metaSaveToSkmDb(pMeta, &nStbEntry); @@ -519,7 +519,7 @@ int metaAddIndexToSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { goto _err; } - metaWLock(pMeta); + metaCheckTtlTaskAndWLock(pMeta); tdbTbUpsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, pMeta->txn); metaULock(pMeta); metaDestroyTagIdxKey(pTagIdxKey); @@ -532,7 +532,7 @@ int metaAddIndexToSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { nStbEntry.stbEntry.schemaRow = pReq->schemaRow; nStbEntry.stbEntry.schemaTag = pReq->schemaTag; - metaWLock(pMeta); + metaCheckTtlTaskAndWLock(pMeta); // update table.db metaSaveToTbDb(pMeta, &nStbEntry); // update uid index @@ -649,7 +649,7 @@ int metaDropIndexFromSTable(SMeta *pMeta, int64_t version, SDropIndexReq *pReq) goto _err; } - metaWLock(pMeta); + metaCheckTtlTaskAndWLock(pMeta); tdbTbDelete(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, pMeta->txn); metaULock(pMeta); metaDestroyTagIdxKey(pTagIdxKey); @@ -670,7 +670,7 @@ int metaDropIndexFromSTable(SMeta *pMeta, int64_t version, SDropIndexReq *pReq) nStbEntry.stbEntry.schemaTag = *tag; nStbEntry.stbEntry.rsmaParam = oStbEntry.stbEntry.rsmaParam; - metaWLock(pMeta); + metaCheckTtlTaskAndWLock(pMeta); // update table.db metaSaveToTbDb(pMeta, &nStbEntry); // update uid index @@ -768,7 +768,7 @@ int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRs ++pMeta->pVnode->config.vndStats.numOfCTables; - metaWLock(pMeta); + metaCheckTtlTaskAndWLock(pMeta); metaUpdateStbStats(pMeta, me.ctbEntry.suid, 1); metaUidCacheClear(pMeta, me.ctbEntry.suid); metaTbGroupCacheClear(pMeta, me.ctbEntry.suid); @@ -826,7 +826,7 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi } uid = *(tb_uid_t *)pData; - metaWLock(pMeta); + metaCheckTtlTaskAndWLock(pMeta); metaDropTableByUid(pMeta, uid, &type); metaULock(pMeta); @@ -843,7 +843,7 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi } static void metaDropTables(SMeta *pMeta, SArray *tbUids) { - metaWLock(pMeta); + metaCheckTtlTaskAndWLock(pMeta); for (int i = 0; i < TARRAY_SIZE(tbUids); ++i) { tb_uid_t uid = *(tb_uid_t *)taosArrayGet(tbUids, i); metaDropTableByUid(pMeta, uid, NULL); @@ -954,14 +954,19 @@ int metaTtlDropTables(SMeta *pMeta, SArray *tbUids, bool* pShallAbort) { metaInfo("ttl find expired table count: %zu", TARRAY_SIZE(tbUids)); + const int releaseLockRound = 50; for (int i = 0; i < TARRAY_SIZE(tbUids); ++i) { tb_uid_t uid = *(tb_uid_t *)taosArrayGet(tbUids, i); if (*pShallAbort) break; - metaWaitTxnReadyAndWLock(pMeta); + if (i % releaseLockRound == 0) { + metaWaitTxnReadyAndWLock(pMeta); + } metaDropTableByUid(pMeta, uid, NULL); - metaULockAndPostTxnReady(pMeta); - taosUsleep(1); + if (i % releaseLockRound == releaseLockRound - 1 || i == TARRAY_SIZE(tbUids) - 1) { + metaULockAndPostTxnReady(pMeta); + //sched_yield(); + } } int64_t endMs = taosGetTimestampMs(); @@ -1336,7 +1341,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl entry.version = version; // do actual write - metaWLock(pMeta); + metaCheckTtlTaskAndWLock(pMeta); metaDeleteNcolIdx(pMeta, &oldEntry); metaUpdateNcolIdx(pMeta, &entry); @@ -1514,7 +1519,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA taosArrayDestroy(pTagArray); } - metaWLock(pMeta); + metaCheckTtlTaskAndWLock(pMeta); // save to table.db metaSaveToTbDb(pMeta, &ctbEntry); @@ -1624,7 +1629,7 @@ static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *p } entry.version = version; - metaWLock(pMeta); + metaCheckTtlTaskAndWLock(pMeta); // build SMetaEntry if (entry.type == TSDB_CHILD_TABLE) { if (pAlterTbReq->updateTTL) { @@ -1889,7 +1894,7 @@ static int metaDropTagIndex(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterT } tdbTbcClose(pTagIdxc); - metaWLock(pMeta); + metaCheckTtlTaskAndWLock(pMeta); for (int i = 0; i < taosArrayGetSize(tagIdxList); i++) { SMetaPair *pair = taosArrayGet(tagIdxList, i); tdbTbDelete(pMeta->pTagIdx, pair->key, pair->nkey, pMeta->txn); @@ -2028,7 +2033,7 @@ static int metaUpdateChangeTime(SMeta *pMeta, tb_uid_t uid, int64_t changeTimeMs } int metaUpdateChangeTimeWithLock(SMeta *pMeta, tb_uid_t uid, int64_t changeTimeMs) { - metaWLock(pMeta); + metaCheckTtlTaskAndWLock(pMeta); int ret = metaUpdateChangeTime(pMeta, uid, changeTimeMs); metaULock(pMeta); return ret; @@ -2209,7 +2214,7 @@ _exit: int metaHandleEntry(SMeta *pMeta, const SMetaEntry *pME) { int32_t code = 0; int32_t line = 0; - metaWLock(pMeta); + metaCheckTtlTaskAndWLock(pMeta); // save to table.db code = metaSaveToTbDb(pMeta, pME); -- GitLab