提交 77c453e5 编写于 作者: S Shungang Li

committer and writer mutual exclusion

上级 d5180b01
...@@ -41,6 +41,7 @@ typedef struct SMetaCache SMetaCache; ...@@ -41,6 +41,7 @@ typedef struct SMetaCache SMetaCache;
// metaOpen ================== // metaOpen ==================
int32_t metaRLock(SMeta* pMeta); int32_t metaRLock(SMeta* pMeta);
int32_t metaWLock(SMeta* pMeta); int32_t metaWLock(SMeta* pMeta);
int32_t metaCheckTtlTaskAndWLock(SMeta* pMeta);
int32_t metaULock(SMeta* pMeta); int32_t metaULock(SMeta* pMeta);
int32_t metaWaitTxnReadyAndWLock(SMeta* pMeta); int32_t metaWaitTxnReadyAndWLock(SMeta* pMeta);
int32_t metaULockAndPostTxnReady(SMeta* pMeta); int32_t metaULockAndPostTxnReady(SMeta* pMeta);
...@@ -79,7 +80,8 @@ int32_t metaUidFilterCacheGet(SMeta* pMeta, uint64_t suid, const void* pKey, int ...@@ -79,7 +80,8 @@ int32_t metaUidFilterCacheGet(SMeta* pMeta, uint64_t suid, const void* pKey, int
struct SMeta { struct SMeta {
TdThreadRwlock lock; TdThreadRwlock lock;
tsem_t txnReady; // vnode-write: wait in 'metaCommit' and post in 'metaBegin' tsem_t txnReady; // vnode-write: wait in 'metaCommit' and post in 'metaBegin'
tsem_t writerWaiting;
char* path; char* path;
SVnode* pVnode; SVnode* pVnode;
......
...@@ -48,7 +48,9 @@ int metaBegin(SMeta *pMeta, int8_t heap) { ...@@ -48,7 +48,9 @@ int metaBegin(SMeta *pMeta, int8_t heap) {
// commit the meta txn // commit the meta txn
TXN *metaGetTxn(SMeta *pMeta) { return pMeta->txn; } TXN *metaGetTxn(SMeta *pMeta) { return pMeta->txn; }
int metaCommit(SMeta *pMeta, TXN *txn) { int metaCommit(SMeta *pMeta, TXN *txn) {
tsem_wait(&pMeta->writerWaiting);
tsem_wait(&pMeta->txnReady); tsem_wait(&pMeta->txnReady);
tsem_post(&pMeta->writerWaiting);
ttlMgrFlush(pMeta->pTtlMgr, pMeta->txn); ttlMgrFlush(pMeta->pTtlMgr, pMeta->txn);
return tdbCommit(pMeta->pEnv, txn); return tdbCommit(pMeta->pEnv, txn);
......
...@@ -52,6 +52,7 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) { ...@@ -52,6 +52,7 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
metaInitLock(pMeta); metaInitLock(pMeta);
tsem_init(&(pMeta->txnReady), 0, 0); tsem_init(&(pMeta->txnReady), 0, 0);
tsem_init(&(pMeta->writerWaiting), 0, 1);
pMeta->path = (char *)&pMeta[1]; pMeta->path = (char *)&pMeta[1];
strcpy(pMeta->path, path); strcpy(pMeta->path, path);
...@@ -224,7 +225,7 @@ int metaClose(SMeta **ppMeta) { ...@@ -224,7 +225,7 @@ int metaClose(SMeta **ppMeta) {
} }
int metaAlterCache(SMeta *pMeta, int32_t nPage) { int metaAlterCache(SMeta *pMeta, int32_t nPage) {
metaWLock(pMeta); metaCheckTtlTaskAndWLock(pMeta);
if (tdbAlter(pMeta->pEnv, nPage) < 0) { if (tdbAlter(pMeta->pEnv, nPage) < 0) {
metaULock(pMeta); metaULock(pMeta);
...@@ -255,6 +256,23 @@ int32_t metaWLock(SMeta *pMeta) { ...@@ -255,6 +256,23 @@ int32_t metaWLock(SMeta *pMeta) {
return ret; return ret;
} }
int32_t metaCheckTtlTaskAndWLock(SMeta *pMeta) {
int32_t ret = 0;
bool needPost = false;
if (pMeta->pVnode->ttlTaskProcessing) {
tsem_wait(&pMeta->writerWaiting);
needPost = true;
}
ret = metaWLock(pMeta);
if (needPost) {
tsem_post(&pMeta->writerWaiting);
}
return ret;
}
int32_t metaULock(SMeta *pMeta) { int32_t metaULock(SMeta *pMeta) {
int32_t ret = 0; int32_t ret = 0;
...@@ -268,6 +286,9 @@ int32_t metaULock(SMeta *pMeta) { ...@@ -268,6 +286,9 @@ int32_t metaULock(SMeta *pMeta) {
int32_t metaWaitTxnReadyAndWLock(SMeta *pMeta) { int32_t metaWaitTxnReadyAndWLock(SMeta *pMeta) {
int32_t ret = 0; int32_t ret = 0;
tsem_wait(&pMeta->writerWaiting);
tsem_post(&pMeta->writerWaiting);
tsem_wait(&pMeta->txnReady); tsem_wait(&pMeta->txnReady);
ret = metaWLock(pMeta); ret = metaWLock(pMeta);
......
...@@ -1465,7 +1465,7 @@ int32_t metaGetInfo(SMeta *pMeta, int64_t uid, SMetaInfo *pInfo, SMetaReader *pR ...@@ -1465,7 +1465,7 @@ int32_t metaGetInfo(SMeta *pMeta, int64_t uid, SMetaInfo *pInfo, SMetaReader *pR
} }
} }
// upsert the cache // upsert the cache
metaWLock(pMeta); metaCheckTtlTaskAndWLock(pMeta);
metaCacheUpsert(pMeta, pInfo); metaCacheUpsert(pMeta, pInfo);
metaULock(pMeta); metaULock(pMeta);
...@@ -1504,7 +1504,7 @@ int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t *numOfTables) { ...@@ -1504,7 +1504,7 @@ int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t *numOfTables) {
state.ctbNum = ctbNum; state.ctbNum = ctbNum;
// upsert the cache // upsert the cache
metaWLock(pVnodeObj->pMeta); metaCheckTtlTaskAndWLock(pVnodeObj->pMeta);
metaStatsCacheUpsert(pVnodeObj->pMeta, &state); metaStatsCacheUpsert(pVnodeObj->pMeta, &state);
metaULock(pVnodeObj->pMeta); metaULock(pVnodeObj->pMeta);
......
...@@ -147,7 +147,7 @@ static int metaUpdateSmaIdx(SMeta *pMeta, const SMetaEntry *pME) { ...@@ -147,7 +147,7 @@ static int metaUpdateSmaIdx(SMeta *pMeta, const SMetaEntry *pME) {
} }
static int metaHandleSmaEntry(SMeta *pMeta, const SMetaEntry *pME) { static int metaHandleSmaEntry(SMeta *pMeta, const SMetaEntry *pME) {
metaWLock(pMeta); metaCheckTtlTaskAndWLock(pMeta);
// save to table.db // save to table.db
if (metaSaveSmaToDB(pMeta, pME) < 0) goto _err; if (metaSaveSmaToDB(pMeta, pME) < 0) goto _err;
......
...@@ -269,7 +269,7 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq, SArray *tb ...@@ -269,7 +269,7 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq, SArray *tb
rc = tdbTbcMoveTo(pCtbIdxc, &(SCtbIdxKey){.suid = pReq->suid, .uid = INT64_MIN}, sizeof(SCtbIdxKey), &c); rc = tdbTbcMoveTo(pCtbIdxc, &(SCtbIdxKey){.suid = pReq->suid, .uid = INT64_MIN}, sizeof(SCtbIdxKey), &c);
if (rc < 0) { if (rc < 0) {
tdbTbcClose(pCtbIdxc); tdbTbcClose(pCtbIdxc);
metaWLock(pMeta); metaCheckTtlTaskAndWLock(pMeta);
goto _drop_super_table; goto _drop_super_table;
} }
...@@ -288,7 +288,7 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq, SArray *tb ...@@ -288,7 +288,7 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq, SArray *tb
tdbTbcClose(pCtbIdxc); tdbTbcClose(pCtbIdxc);
metaWLock(pMeta); metaCheckTtlTaskAndWLock(pMeta);
for (int32_t iChild = 0; iChild < taosArrayGetSize(tbUidList); iChild++) { for (int32_t iChild = 0; iChild < taosArrayGetSize(tbUidList); iChild++) {
tb_uid_t uid = *(tb_uid_t *)taosArrayGet(tbUidList, iChild); tb_uid_t uid = *(tb_uid_t *)taosArrayGet(tbUidList, iChild);
...@@ -376,7 +376,7 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { ...@@ -376,7 +376,7 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
nStbEntry.stbEntry.schemaRow = pReq->schemaRow; nStbEntry.stbEntry.schemaRow = pReq->schemaRow;
nStbEntry.stbEntry.schemaTag = pReq->schemaTag; nStbEntry.stbEntry.schemaTag = pReq->schemaTag;
metaWLock(pMeta); metaCheckTtlTaskAndWLock(pMeta);
// compare two entry // compare two entry
if (oStbEntry.stbEntry.schemaRow.version != pReq->schemaRow.version) { if (oStbEntry.stbEntry.schemaRow.version != pReq->schemaRow.version) {
metaSaveToSkmDb(pMeta, &nStbEntry); metaSaveToSkmDb(pMeta, &nStbEntry);
...@@ -519,7 +519,7 @@ int metaAddIndexToSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { ...@@ -519,7 +519,7 @@ int metaAddIndexToSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
goto _err; goto _err;
} }
metaWLock(pMeta); metaCheckTtlTaskAndWLock(pMeta);
tdbTbUpsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, pMeta->txn); tdbTbUpsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, pMeta->txn);
metaULock(pMeta); metaULock(pMeta);
metaDestroyTagIdxKey(pTagIdxKey); metaDestroyTagIdxKey(pTagIdxKey);
...@@ -532,7 +532,7 @@ int metaAddIndexToSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { ...@@ -532,7 +532,7 @@ int metaAddIndexToSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
nStbEntry.stbEntry.schemaRow = pReq->schemaRow; nStbEntry.stbEntry.schemaRow = pReq->schemaRow;
nStbEntry.stbEntry.schemaTag = pReq->schemaTag; nStbEntry.stbEntry.schemaTag = pReq->schemaTag;
metaWLock(pMeta); metaCheckTtlTaskAndWLock(pMeta);
// update table.db // update table.db
metaSaveToTbDb(pMeta, &nStbEntry); metaSaveToTbDb(pMeta, &nStbEntry);
// update uid index // update uid index
...@@ -649,7 +649,7 @@ int metaDropIndexFromSTable(SMeta *pMeta, int64_t version, SDropIndexReq *pReq) ...@@ -649,7 +649,7 @@ int metaDropIndexFromSTable(SMeta *pMeta, int64_t version, SDropIndexReq *pReq)
goto _err; goto _err;
} }
metaWLock(pMeta); metaCheckTtlTaskAndWLock(pMeta);
tdbTbDelete(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, pMeta->txn); tdbTbDelete(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, pMeta->txn);
metaULock(pMeta); metaULock(pMeta);
metaDestroyTagIdxKey(pTagIdxKey); metaDestroyTagIdxKey(pTagIdxKey);
...@@ -670,7 +670,7 @@ int metaDropIndexFromSTable(SMeta *pMeta, int64_t version, SDropIndexReq *pReq) ...@@ -670,7 +670,7 @@ int metaDropIndexFromSTable(SMeta *pMeta, int64_t version, SDropIndexReq *pReq)
nStbEntry.stbEntry.schemaTag = *tag; nStbEntry.stbEntry.schemaTag = *tag;
nStbEntry.stbEntry.rsmaParam = oStbEntry.stbEntry.rsmaParam; nStbEntry.stbEntry.rsmaParam = oStbEntry.stbEntry.rsmaParam;
metaWLock(pMeta); metaCheckTtlTaskAndWLock(pMeta);
// update table.db // update table.db
metaSaveToTbDb(pMeta, &nStbEntry); metaSaveToTbDb(pMeta, &nStbEntry);
// update uid index // update uid index
...@@ -768,7 +768,7 @@ int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRs ...@@ -768,7 +768,7 @@ int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRs
++pMeta->pVnode->config.vndStats.numOfCTables; ++pMeta->pVnode->config.vndStats.numOfCTables;
metaWLock(pMeta); metaCheckTtlTaskAndWLock(pMeta);
metaUpdateStbStats(pMeta, me.ctbEntry.suid, 1); metaUpdateStbStats(pMeta, me.ctbEntry.suid, 1);
metaUidCacheClear(pMeta, me.ctbEntry.suid); metaUidCacheClear(pMeta, me.ctbEntry.suid);
metaTbGroupCacheClear(pMeta, me.ctbEntry.suid); metaTbGroupCacheClear(pMeta, me.ctbEntry.suid);
...@@ -826,7 +826,7 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi ...@@ -826,7 +826,7 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi
} }
uid = *(tb_uid_t *)pData; uid = *(tb_uid_t *)pData;
metaWLock(pMeta); metaCheckTtlTaskAndWLock(pMeta);
metaDropTableByUid(pMeta, uid, &type); metaDropTableByUid(pMeta, uid, &type);
metaULock(pMeta); metaULock(pMeta);
...@@ -843,7 +843,7 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi ...@@ -843,7 +843,7 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi
} }
static void metaDropTables(SMeta *pMeta, SArray *tbUids) { static void metaDropTables(SMeta *pMeta, SArray *tbUids) {
metaWLock(pMeta); metaCheckTtlTaskAndWLock(pMeta);
for (int i = 0; i < TARRAY_SIZE(tbUids); ++i) { for (int i = 0; i < TARRAY_SIZE(tbUids); ++i) {
tb_uid_t uid = *(tb_uid_t *)taosArrayGet(tbUids, i); tb_uid_t uid = *(tb_uid_t *)taosArrayGet(tbUids, i);
metaDropTableByUid(pMeta, uid, NULL); metaDropTableByUid(pMeta, uid, NULL);
...@@ -954,14 +954,19 @@ int metaTtlDropTables(SMeta *pMeta, SArray *tbUids, bool* pShallAbort) { ...@@ -954,14 +954,19 @@ int metaTtlDropTables(SMeta *pMeta, SArray *tbUids, bool* pShallAbort) {
metaInfo("ttl find expired table count: %zu", TARRAY_SIZE(tbUids)); metaInfo("ttl find expired table count: %zu", TARRAY_SIZE(tbUids));
const int releaseLockRound = 50;
for (int i = 0; i < TARRAY_SIZE(tbUids); ++i) { for (int i = 0; i < TARRAY_SIZE(tbUids); ++i) {
tb_uid_t uid = *(tb_uid_t *)taosArrayGet(tbUids, i); tb_uid_t uid = *(tb_uid_t *)taosArrayGet(tbUids, i);
if (*pShallAbort) break; if (*pShallAbort) break;
metaWaitTxnReadyAndWLock(pMeta); if (i % releaseLockRound == 0) {
metaWaitTxnReadyAndWLock(pMeta);
}
metaDropTableByUid(pMeta, uid, NULL); metaDropTableByUid(pMeta, uid, NULL);
metaULockAndPostTxnReady(pMeta); if (i % releaseLockRound == releaseLockRound - 1 || i == TARRAY_SIZE(tbUids) - 1) {
taosUsleep(1); metaULockAndPostTxnReady(pMeta);
//sched_yield();
}
} }
int64_t endMs = taosGetTimestampMs(); int64_t endMs = taosGetTimestampMs();
...@@ -1336,7 +1341,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl ...@@ -1336,7 +1341,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
entry.version = version; entry.version = version;
// do actual write // do actual write
metaWLock(pMeta); metaCheckTtlTaskAndWLock(pMeta);
metaDeleteNcolIdx(pMeta, &oldEntry); metaDeleteNcolIdx(pMeta, &oldEntry);
metaUpdateNcolIdx(pMeta, &entry); metaUpdateNcolIdx(pMeta, &entry);
...@@ -1514,7 +1519,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA ...@@ -1514,7 +1519,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
taosArrayDestroy(pTagArray); taosArrayDestroy(pTagArray);
} }
metaWLock(pMeta); metaCheckTtlTaskAndWLock(pMeta);
// save to table.db // save to table.db
metaSaveToTbDb(pMeta, &ctbEntry); metaSaveToTbDb(pMeta, &ctbEntry);
...@@ -1624,7 +1629,7 @@ static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *p ...@@ -1624,7 +1629,7 @@ static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *p
} }
entry.version = version; entry.version = version;
metaWLock(pMeta); metaCheckTtlTaskAndWLock(pMeta);
// build SMetaEntry // build SMetaEntry
if (entry.type == TSDB_CHILD_TABLE) { if (entry.type == TSDB_CHILD_TABLE) {
if (pAlterTbReq->updateTTL) { if (pAlterTbReq->updateTTL) {
...@@ -1889,7 +1894,7 @@ static int metaDropTagIndex(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterT ...@@ -1889,7 +1894,7 @@ static int metaDropTagIndex(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterT
} }
tdbTbcClose(pTagIdxc); tdbTbcClose(pTagIdxc);
metaWLock(pMeta); metaCheckTtlTaskAndWLock(pMeta);
for (int i = 0; i < taosArrayGetSize(tagIdxList); i++) { for (int i = 0; i < taosArrayGetSize(tagIdxList); i++) {
SMetaPair *pair = taosArrayGet(tagIdxList, i); SMetaPair *pair = taosArrayGet(tagIdxList, i);
tdbTbDelete(pMeta->pTagIdx, pair->key, pair->nkey, pMeta->txn); tdbTbDelete(pMeta->pTagIdx, pair->key, pair->nkey, pMeta->txn);
...@@ -2028,7 +2033,7 @@ static int metaUpdateChangeTime(SMeta *pMeta, tb_uid_t uid, int64_t changeTimeMs ...@@ -2028,7 +2033,7 @@ static int metaUpdateChangeTime(SMeta *pMeta, tb_uid_t uid, int64_t changeTimeMs
} }
int metaUpdateChangeTimeWithLock(SMeta *pMeta, tb_uid_t uid, int64_t changeTimeMs) { int metaUpdateChangeTimeWithLock(SMeta *pMeta, tb_uid_t uid, int64_t changeTimeMs) {
metaWLock(pMeta); metaCheckTtlTaskAndWLock(pMeta);
int ret = metaUpdateChangeTime(pMeta, uid, changeTimeMs); int ret = metaUpdateChangeTime(pMeta, uid, changeTimeMs);
metaULock(pMeta); metaULock(pMeta);
return ret; return ret;
...@@ -2209,7 +2214,7 @@ _exit: ...@@ -2209,7 +2214,7 @@ _exit:
int metaHandleEntry(SMeta *pMeta, const SMetaEntry *pME) { int metaHandleEntry(SMeta *pMeta, const SMetaEntry *pME) {
int32_t code = 0; int32_t code = 0;
int32_t line = 0; int32_t line = 0;
metaWLock(pMeta); metaCheckTtlTaskAndWLock(pMeta);
// save to table.db // save to table.db
code = metaSaveToTbDb(pMeta, pME); code = metaSaveToTbDb(pMeta, pME);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册