提交 00080750 编写于 作者: S Shengliang Guan

fix: mnode may deadlock while drop another mnode if it crashed

上级 8fe672fe
...@@ -403,6 +403,10 @@ const char *sdbStatusName(ESdbStatus status); ...@@ -403,6 +403,10 @@ const char *sdbStatusName(ESdbStatus status);
void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper); void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper);
int32_t sdbGetIdFromRaw(SSdb *pSdb, SSdbRaw *pRaw); int32_t sdbGetIdFromRaw(SSdb *pSdb, SSdbRaw *pRaw);
void sdbWriteLock(SSdb *pSdb, int32_t type);
void sdbReadLock(SSdb *pSdb, int32_t type);
void sdbUnLock(SSdb *pSdb, int32_t type);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -181,3 +181,23 @@ void sdbGetCommitInfo(SSdb *pSdb, int64_t *index, int64_t *term, int64_t *config ...@@ -181,3 +181,23 @@ void sdbGetCommitInfo(SSdb *pSdb, int64_t *index, int64_t *term, int64_t *config
pSdb->applyIndex, pSdb->applyTerm, pSdb->applyConfig, *index, *term, *config); pSdb->applyIndex, pSdb->applyTerm, pSdb->applyConfig, *index, *term, *config);
#endif #endif
} }
void sdbWriteLock(SSdb *pSdb, int32_t type) {
TdThreadRwlock *pLock = &pSdb->locks[type];
// mTrace("sdb table:%d start write lock:%p", type, pLock);
taosThreadRwlockWrlock(pLock);
// mTrace("sdb table:%d stop write lock:%p", type, pLock);
}
void sdbReadLock(SSdb *pSdb, int32_t type) {
TdThreadRwlock *pLock = &pSdb->locks[type];
// mTrace("sdb table:%d start read lock:%p", type, pLock);
taosThreadRwlockRdlock(pLock);
// mTrace("sdb table:%d stop read lock:%p", type, pLock);
}
void sdbUnLock(SSdb *pSdb, int32_t type) {
TdThreadRwlock *pLock = &pSdb->locks[type];
// mTrace("sdb table:%d unlock:%p", type, pLock);
taosThreadRwlockUnlock(pLock);
}
...@@ -363,9 +363,8 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) { ...@@ -363,9 +363,8 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
mInfo("write %s to sdb file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i)); mInfo("write %s to sdb file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i));
SHashObj *hash = pSdb->hashObjs[i]; SHashObj *hash = pSdb->hashObjs[i];
TdThreadRwlock *pLock = &pSdb->locks[i]; sdbWriteLock(pSdb, i);
taosThreadRwlockWrlock(pLock);
SSdbRow **ppRow = taosHashIterate(hash, NULL); SSdbRow **ppRow = taosHashIterate(hash, NULL);
while (ppRow != NULL) { while (ppRow != NULL) {
...@@ -410,7 +409,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) { ...@@ -410,7 +409,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
sdbFreeRaw(pRaw); sdbFreeRaw(pRaw);
ppRow = taosHashIterate(hash, ppRow); ppRow = taosHashIterate(hash, ppRow);
} }
taosThreadRwlockUnlock(pLock); sdbUnLock(pSdb, i);
} }
if (code == 0) { if (code == 0) {
......
...@@ -133,12 +133,12 @@ static int32_t sdbGetkeySize(SSdb *pSdb, ESdbType type, const void *pKey) { ...@@ -133,12 +133,12 @@ static int32_t sdbGetkeySize(SSdb *pSdb, ESdbType type, const void *pKey) {
} }
static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) { static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) {
TdThreadRwlock *pLock = &pSdb->locks[pRow->type]; int32_t type = pRow->type;
taosThreadRwlockWrlock(pLock); sdbWriteLock(pSdb, type);
SSdbRow *pOldRow = taosHashGet(hash, pRow->pObj, keySize); SSdbRow *pOldRow = taosHashGet(hash, pRow->pObj, keySize);
if (pOldRow != NULL) { if (pOldRow != NULL) {
taosThreadRwlockUnlock(pLock); sdbUnLock(pSdb, type);
sdbFreeRow(pSdb, pRow, false); sdbFreeRow(pSdb, pRow, false);
terrno = TSDB_CODE_SDB_OBJ_ALREADY_THERE; terrno = TSDB_CODE_SDB_OBJ_ALREADY_THERE;
return terrno; return terrno;
...@@ -149,7 +149,7 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * ...@@ -149,7 +149,7 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
sdbPrintOper(pSdb, pRow, "insert"); sdbPrintOper(pSdb, pRow, "insert");
if (taosHashPut(hash, pRow->pObj, keySize, &pRow, sizeof(void *)) != 0) { if (taosHashPut(hash, pRow->pObj, keySize, &pRow, sizeof(void *)) != 0) {
taosThreadRwlockUnlock(pLock); sdbUnLock(pSdb, type);
sdbFreeRow(pSdb, pRow, false); sdbFreeRow(pSdb, pRow, false);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno; return terrno;
...@@ -164,12 +164,12 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * ...@@ -164,12 +164,12 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
taosHashRemove(hash, pRow->pObj, keySize); taosHashRemove(hash, pRow->pObj, keySize);
sdbFreeRow(pSdb, pRow, false); sdbFreeRow(pSdb, pRow, false);
terrno = code; terrno = code;
taosThreadRwlockUnlock(pLock); sdbUnLock(pSdb, type);
return terrno; return terrno;
} }
} }
taosThreadRwlockUnlock(pLock); sdbUnLock(pSdb, type);
if (pSdb->keyTypes[pRow->type] == SDB_KEY_INT32) { if (pSdb->keyTypes[pRow->type] == SDB_KEY_INT32) {
pSdb->maxId[pRow->type] = TMAX(pSdb->maxId[pRow->type], *((int32_t *)pRow->pObj)); pSdb->maxId[pRow->type] = TMAX(pSdb->maxId[pRow->type], *((int32_t *)pRow->pObj));
...@@ -183,26 +183,27 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * ...@@ -183,26 +183,27 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
} }
static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pNewRow, int32_t keySize) { static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pNewRow, int32_t keySize) {
TdThreadRwlock *pLock = &pSdb->locks[pNewRow->type]; int32_t type = pNewRow->type;
taosThreadRwlockWrlock(pLock); sdbWriteLock(pSdb, type);
SSdbRow **ppOldRow = taosHashGet(hash, pNewRow->pObj, keySize); SSdbRow **ppOldRow = taosHashGet(hash, pNewRow->pObj, keySize);
if (ppOldRow == NULL || *ppOldRow == NULL) { if (ppOldRow == NULL || *ppOldRow == NULL) {
taosThreadRwlockUnlock(pLock); sdbUnLock(pSdb, type);
return sdbInsertRow(pSdb, hash, pRaw, pNewRow, keySize); return sdbInsertRow(pSdb, hash, pRaw, pNewRow, keySize);
} }
SSdbRow *pOldRow = *ppOldRow; SSdbRow *pOldRow = *ppOldRow;
pOldRow->status = pRaw->status; pOldRow->status = pRaw->status;
sdbPrintOper(pSdb, pOldRow, "update"); sdbPrintOper(pSdb, pOldRow, "update");
sdbUnLock(pSdb, type);
int32_t code = 0; int32_t code = 0;
SdbUpdateFp updateFp = pSdb->updateFps[pNewRow->type]; SdbUpdateFp updateFp = pSdb->updateFps[type];
if (updateFp != NULL) { if (updateFp != NULL) {
code = (*updateFp)(pSdb, pOldRow->pObj, pNewRow->pObj); code = (*updateFp)(pSdb, pOldRow->pObj, pNewRow->pObj);
} }
taosThreadRwlockUnlock(pLock); sdbUnLock(pSdb, type);
sdbFreeRow(pSdb, pNewRow, false); sdbFreeRow(pSdb, pNewRow, false);
pSdb->tableVer[pOldRow->type]++; pSdb->tableVer[pOldRow->type]++;
...@@ -210,12 +211,12 @@ static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * ...@@ -210,12 +211,12 @@ static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
} }
static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) { static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) {
TdThreadRwlock *pLock = &pSdb->locks[pRow->type]; int32_t type = pRow->type;
taosThreadRwlockWrlock(pLock); sdbWriteLock(pSdb, type);
SSdbRow **ppOldRow = taosHashGet(hash, pRow->pObj, keySize); SSdbRow **ppOldRow = taosHashGet(hash, pRow->pObj, keySize);
if (ppOldRow == NULL || *ppOldRow == NULL) { if (ppOldRow == NULL || *ppOldRow == NULL) {
taosThreadRwlockUnlock(pLock); sdbUnLock(pSdb, type);
sdbFreeRow(pSdb, pRow, false); sdbFreeRow(pSdb, pRow, false);
terrno = TSDB_CODE_SDB_OBJ_NOT_THERE; terrno = TSDB_CODE_SDB_OBJ_NOT_THERE;
return terrno; return terrno;
...@@ -228,7 +229,7 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * ...@@ -228,7 +229,7 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
taosHashRemove(hash, pOldRow->pObj, keySize); taosHashRemove(hash, pOldRow->pObj, keySize);
pSdb->tableVer[pOldRow->type]++; pSdb->tableVer[pOldRow->type]++;
taosThreadRwlockUnlock(pLock); sdbUnLock(pSdb, type);
sdbFreeRow(pSdb, pRow, false); sdbFreeRow(pSdb, pRow, false);
...@@ -282,12 +283,11 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) { ...@@ -282,12 +283,11 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) {
void *pRet = NULL; void *pRet = NULL;
int32_t keySize = sdbGetkeySize(pSdb, type, pKey); int32_t keySize = sdbGetkeySize(pSdb, type, pKey);
TdThreadRwlock *pLock = &pSdb->locks[type]; sdbReadLock(pSdb, type);
taosThreadRwlockRdlock(pLock);
SSdbRow **ppRow = taosHashGet(hash, pKey, keySize); SSdbRow **ppRow = taosHashGet(hash, pKey, keySize);
if (ppRow == NULL || *ppRow == NULL) { if (ppRow == NULL || *ppRow == NULL) {
taosThreadRwlockUnlock(pLock); sdbUnLock(pSdb, type);
terrno = TSDB_CODE_SDB_OBJ_NOT_THERE; terrno = TSDB_CODE_SDB_OBJ_NOT_THERE;
return NULL; return NULL;
} }
...@@ -310,13 +310,13 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) { ...@@ -310,13 +310,13 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) {
break; break;
} }
taosThreadRwlockUnlock(pLock); sdbUnLock(pSdb, type);
return pRet; return pRet;
} }
static void sdbCheckRow(SSdb *pSdb, SSdbRow *pRow) { static void sdbCheckRow(SSdb *pSdb, SSdbRow *pRow) {
TdThreadRwlock *pLock = &pSdb->locks[pRow->type]; int32_t type = pRow->type;
taosThreadRwlockWrlock(pLock); sdbWriteLock(pSdb, type);
int32_t ref = atomic_sub_fetch_32(&pRow->refCount, 1); int32_t ref = atomic_sub_fetch_32(&pRow->refCount, 1);
sdbPrintOper(pSdb, pRow, "check"); sdbPrintOper(pSdb, pRow, "check");
...@@ -324,7 +324,7 @@ static void sdbCheckRow(SSdb *pSdb, SSdbRow *pRow) { ...@@ -324,7 +324,7 @@ static void sdbCheckRow(SSdb *pSdb, SSdbRow *pRow) {
sdbFreeRow(pSdb, pRow, true); sdbFreeRow(pSdb, pRow, true);
} }
taosThreadRwlockUnlock(pLock); sdbUnLock(pSdb, type);
} }
void sdbReleaseLock(SSdb *pSdb, void *pObj, bool lock) { void sdbReleaseLock(SSdb *pSdb, void *pObj, bool lock) {
...@@ -333,9 +333,9 @@ void sdbReleaseLock(SSdb *pSdb, void *pObj, bool lock) { ...@@ -333,9 +333,9 @@ void sdbReleaseLock(SSdb *pSdb, void *pObj, bool lock) {
SSdbRow *pRow = (SSdbRow *)((char *)pObj - sizeof(SSdbRow)); SSdbRow *pRow = (SSdbRow *)((char *)pObj - sizeof(SSdbRow));
if (pRow->type >= SDB_MAX) return; if (pRow->type >= SDB_MAX) return;
TdThreadRwlock *pLock = &pSdb->locks[pRow->type]; int32_t type = pRow->type;
if (lock) { if (lock) {
taosThreadRwlockWrlock(pLock); sdbWriteLock(pSdb, type);
} }
int32_t ref = atomic_sub_fetch_32(&pRow->refCount, 1); int32_t ref = atomic_sub_fetch_32(&pRow->refCount, 1);
...@@ -345,7 +345,7 @@ void sdbReleaseLock(SSdb *pSdb, void *pObj, bool lock) { ...@@ -345,7 +345,7 @@ void sdbReleaseLock(SSdb *pSdb, void *pObj, bool lock) {
} }
if (lock) { if (lock) {
taosThreadRwlockUnlock(pLock); sdbUnLock(pSdb, type);
} }
} }
...@@ -357,8 +357,7 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) { ...@@ -357,8 +357,7 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) {
SHashObj *hash = sdbGetHash(pSdb, type); SHashObj *hash = sdbGetHash(pSdb, type);
if (hash == NULL) return NULL; if (hash == NULL) return NULL;
TdThreadRwlock *pLock = &pSdb->locks[type]; sdbReadLock(pSdb, type);
taosThreadRwlockRdlock(pLock);
SSdbRow **ppRow = taosHashIterate(hash, pIter); SSdbRow **ppRow = taosHashIterate(hash, pIter);
while (ppRow != NULL) { while (ppRow != NULL) {
...@@ -373,7 +372,7 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) { ...@@ -373,7 +372,7 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) {
*ppObj = pRow->pObj; *ppObj = pRow->pObj;
break; break;
} }
taosThreadRwlockUnlock(pLock); sdbUnLock(pSdb, type);
return ppRow; return ppRow;
} }
...@@ -384,9 +383,8 @@ void *sdbFetchAll(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj, ESdbStat ...@@ -384,9 +383,8 @@ void *sdbFetchAll(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj, ESdbStat
SHashObj *hash = sdbGetHash(pSdb, type); SHashObj *hash = sdbGetHash(pSdb, type);
if (hash == NULL) return NULL; if (hash == NULL) return NULL;
TdThreadRwlock *pLock = &pSdb->locks[type];
if (lock) { if (lock) {
taosThreadRwlockRdlock(pLock); sdbReadLock(pSdb, type);
} }
SSdbRow **ppRow = taosHashIterate(hash, pIter); SSdbRow **ppRow = taosHashIterate(hash, pIter);
...@@ -404,7 +402,7 @@ void *sdbFetchAll(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj, ESdbStat ...@@ -404,7 +402,7 @@ void *sdbFetchAll(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj, ESdbStat
break; break;
} }
if (lock) { if (lock) {
taosThreadRwlockUnlock(pLock); sdbUnLock(pSdb, type);
} }
return ppRow; return ppRow;
...@@ -416,18 +414,17 @@ void sdbCancelFetch(SSdb *pSdb, void *pIter) { ...@@ -416,18 +414,17 @@ void sdbCancelFetch(SSdb *pSdb, void *pIter) {
SHashObj *hash = sdbGetHash(pSdb, pRow->type); SHashObj *hash = sdbGetHash(pSdb, pRow->type);
if (hash == NULL) return; if (hash == NULL) return;
TdThreadRwlock *pLock = &pSdb->locks[pRow->type]; int32_t type = pRow->type;
taosThreadRwlockRdlock(pLock); sdbReadLock(pSdb, type);
taosHashCancelIterate(hash, pIter); taosHashCancelIterate(hash, pIter);
taosThreadRwlockUnlock(pLock); sdbUnLock(pSdb, type);
} }
void sdbTraverse(SSdb *pSdb, ESdbType type, sdbTraverseFp fp, void *p1, void *p2, void *p3) { void sdbTraverse(SSdb *pSdb, ESdbType type, sdbTraverseFp fp, void *p1, void *p2, void *p3) {
SHashObj *hash = sdbGetHash(pSdb, type); SHashObj *hash = sdbGetHash(pSdb, type);
if (hash == NULL) return; if (hash == NULL) return;
TdThreadRwlock *pLock = &pSdb->locks[type]; sdbReadLock(pSdb, type);
taosThreadRwlockRdlock(pLock);
SSdbRow **ppRow = taosHashIterate(hash, NULL); SSdbRow **ppRow = taosHashIterate(hash, NULL);
while (ppRow != NULL) { while (ppRow != NULL) {
...@@ -443,17 +440,16 @@ void sdbTraverse(SSdb *pSdb, ESdbType type, sdbTraverseFp fp, void *p1, void *p2 ...@@ -443,17 +440,16 @@ void sdbTraverse(SSdb *pSdb, ESdbType type, sdbTraverseFp fp, void *p1, void *p2
ppRow = taosHashIterate(hash, ppRow); ppRow = taosHashIterate(hash, ppRow);
} }
taosThreadRwlockUnlock(pLock); sdbUnLock(pSdb, type);
} }
int32_t sdbGetSize(SSdb *pSdb, ESdbType type) { int32_t sdbGetSize(SSdb *pSdb, ESdbType type) {
SHashObj *hash = sdbGetHash(pSdb, type); SHashObj *hash = sdbGetHash(pSdb, type);
if (hash == NULL) return 0; if (hash == NULL) return 0;
TdThreadRwlock *pLock = &pSdb->locks[type]; sdbReadLock(pSdb, type);
taosThreadRwlockRdlock(pLock);
int32_t size = taosHashGetSize(hash); int32_t size = taosHashGetSize(hash);
taosThreadRwlockUnlock(pLock); sdbUnLock(pSdb, type);
return size; return size;
} }
...@@ -465,9 +461,7 @@ int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type) { ...@@ -465,9 +461,7 @@ int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type) {
if (pSdb->keyTypes[type] != SDB_KEY_INT32) return -1; if (pSdb->keyTypes[type] != SDB_KEY_INT32) return -1;
int32_t maxId = 0; int32_t maxId = 0;
sdbReadLock(pSdb, type);
TdThreadRwlock *pLock = &pSdb->locks[type];
taosThreadRwlockRdlock(pLock);
SSdbRow **ppRow = taosHashIterate(hash, NULL); SSdbRow **ppRow = taosHashIterate(hash, NULL);
while (ppRow != NULL) { while (ppRow != NULL) {
...@@ -477,8 +471,7 @@ int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type) { ...@@ -477,8 +471,7 @@ int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type) {
ppRow = taosHashIterate(hash, ppRow); ppRow = taosHashIterate(hash, ppRow);
} }
taosThreadRwlockUnlock(pLock); sdbUnLock(pSdb, type);
maxId = TMAX(maxId, pSdb->maxId[type]); maxId = TMAX(maxId, pSdb->maxId[type]);
return maxId + 1; return maxId + 1;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册