提交 5fdaceb8 编写于 作者: S Shengliang Guan

refactor: adjust lock in sdb

上级 fe7f246c
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "sdbInt.h" #include "sdbInt.h"
static void sdbCheck(SSdb *pSdb, SSdbRow *pRow); static void sdbCheckRow(SSdb *pSdb, SSdbRow *pRow);
const char *sdbTableName(ESdbType type) { const char *sdbTableName(ESdbType type) {
switch (type) { switch (type) {
...@@ -65,7 +65,7 @@ const char *sdbTableName(ESdbType type) { ...@@ -65,7 +65,7 @@ const char *sdbTableName(ESdbType type) {
} }
} }
static const char *sdbStatusStr(ESdbStatus status) { static const char *sdbStatusName(ESdbStatus status) {
switch (status) { switch (status) {
case SDB_STATUS_CREATING: case SDB_STATUS_CREATING:
return "creating"; return "creating";
...@@ -89,13 +89,13 @@ void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper) { ...@@ -89,13 +89,13 @@ void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper) {
if (keyType == SDB_KEY_BINARY) { if (keyType == SDB_KEY_BINARY) {
mTrace("%s:%s, ref:%d oper:%s row:%p status:%s", sdbTableName(pRow->type), (char *)pRow->pObj, pRow->refCount, oper, mTrace("%s:%s, ref:%d oper:%s row:%p status:%s", sdbTableName(pRow->type), (char *)pRow->pObj, pRow->refCount, oper,
pRow->pObj, sdbStatusStr(pRow->status)); pRow->pObj, sdbStatusName(pRow->status));
} else if (keyType == SDB_KEY_INT32) { } else if (keyType == SDB_KEY_INT32) {
mTrace("%s:%d, ref:%d oper:%s row:%p status:%s", sdbTableName(pRow->type), *(int32_t *)pRow->pObj, pRow->refCount, mTrace("%s:%d, ref:%d oper:%s row:%p status:%s", sdbTableName(pRow->type), *(int32_t *)pRow->pObj, pRow->refCount,
oper, pRow->pObj, sdbStatusStr(pRow->status)); oper, pRow->pObj, sdbStatusName(pRow->status));
} else if (keyType == SDB_KEY_INT64) { } else if (keyType == SDB_KEY_INT64) {
mTrace("%s:%" PRId64 ", ref:%d oper:%s row:%p status:%s", sdbTableName(pRow->type), *(int64_t *)pRow->pObj, mTrace("%s:%" PRId64 ", ref:%d oper:%s row:%p status:%s", sdbTableName(pRow->type), *(int64_t *)pRow->pObj,
pRow->refCount, oper, pRow->pObj, sdbStatusStr(pRow->status)); pRow->refCount, oper, pRow->pObj, sdbStatusName(pRow->status));
} else { } else {
} }
} }
...@@ -116,7 +116,7 @@ static SHashObj *sdbGetHash(SSdb *pSdb, int32_t type) { ...@@ -116,7 +116,7 @@ static SHashObj *sdbGetHash(SSdb *pSdb, int32_t type) {
} }
static int32_t sdbGetkeySize(SSdb *pSdb, ESdbType type, const void *pKey) { static int32_t sdbGetkeySize(SSdb *pSdb, ESdbType type, const void *pKey) {
int32_t keySize; int32_t keySize = 0;
EKeyType keyType = pSdb->keyTypes[type]; EKeyType keyType = pSdb->keyTypes[type];
if (keyType == SDB_KEY_INT32) { if (keyType == SDB_KEY_INT32) {
...@@ -149,7 +149,7 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * ...@@ -149,7 +149,7 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
if (taosHashPut(hash, pRow->pObj, keySize, &pRow, sizeof(void *)) != 0) { if (taosHashPut(hash, pRow->pObj, keySize, &pRow, sizeof(void *)) != 0) {
taosWUnLockLatch(pLock); taosWUnLockLatch(pLock);
sdbFreeRow(pSdb, pRow, false); sdbFreeRow(pSdb, pRow, false);
terrno = TSDB_CODE_SDB_OBJ_ALREADY_THERE; terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno; return terrno;
} }
...@@ -183,18 +183,18 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * ...@@ -183,18 +183,18 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pNewRow, int32_t keySize) { static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pNewRow, int32_t keySize) {
SRWLatch *pLock = &pSdb->locks[pNewRow->type]; SRWLatch *pLock = &pSdb->locks[pNewRow->type];
taosRLockLatch(pLock); taosWLockLatch(pLock);
SSdbRow **ppOldRow = taosHashGet(hash, pNewRow->pObj, keySize); SSdbRow **ppOldRow = taosHashGet(hash, pNewRow->pObj, keySize);
if (ppOldRow == NULL || *ppOldRow == NULL) { if (ppOldRow == NULL || *ppOldRow == NULL) {
taosRUnLockLatch(pLock); taosWUnLockLatch(pLock);
return sdbInsertRow(pSdb, hash, pRaw, pNewRow, keySize); return sdbInsertRow(pSdb, hash, pRaw, pNewRow, keySize);
} }
SSdbRow *pOldRow = *ppOldRow; SSdbRow *pOldRow = *ppOldRow;
pOldRow->status = pRaw->status; pOldRow->status = pRaw->status;
sdbPrintOper(pSdb, pOldRow, "update"); sdbPrintOper(pSdb, pOldRow, "update");
taosRUnLockLatch(pLock); taosWUnLockLatch(pLock);
int32_t code = 0; int32_t code = 0;
SdbUpdateFp updateFp = pSdb->updateFps[pNewRow->type]; SdbUpdateFp updateFp = pSdb->updateFps[pNewRow->type];
...@@ -230,7 +230,7 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * ...@@ -230,7 +230,7 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
pSdb->tableVer[pOldRow->type]++; pSdb->tableVer[pOldRow->type]++;
sdbFreeRow(pSdb, pRow, false); sdbFreeRow(pSdb, pRow, false);
sdbCheck(pSdb, pOldRow); sdbCheckRow(pSdb, pOldRow);
return 0; return 0;
} }
...@@ -314,9 +314,9 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) { ...@@ -314,9 +314,9 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) {
return pRet; return pRet;
} }
static void sdbCheck(SSdb *pSdb, SSdbRow *pRow) { static void sdbCheckRow(SSdb *pSdb, SSdbRow *pRow) {
SRWLatch *pLock = &pSdb->locks[pRow->type]; SRWLatch *pLock = &pSdb->locks[pRow->type];
taosRLockLatch(pLock); taosWLockLatch(pLock);
int32_t ref = atomic_load_32(&pRow->refCount); int32_t ref = atomic_load_32(&pRow->refCount);
sdbPrintOper(pSdb, pRow, "check"); sdbPrintOper(pSdb, pRow, "check");
...@@ -324,7 +324,7 @@ static void sdbCheck(SSdb *pSdb, SSdbRow *pRow) { ...@@ -324,7 +324,7 @@ static void sdbCheck(SSdb *pSdb, SSdbRow *pRow) {
sdbFreeRow(pSdb, pRow, true); sdbFreeRow(pSdb, pRow, true);
} }
taosRUnLockLatch(pLock); taosWUnLockLatch(pLock);
} }
void sdbRelease(SSdb *pSdb, void *pObj) { void sdbRelease(SSdb *pSdb, void *pObj) {
...@@ -334,7 +334,7 @@ void sdbRelease(SSdb *pSdb, void *pObj) { ...@@ -334,7 +334,7 @@ void sdbRelease(SSdb *pSdb, void *pObj) {
if (pRow->type >= SDB_MAX) return; if (pRow->type >= SDB_MAX) return;
SRWLatch *pLock = &pSdb->locks[pRow->type]; SRWLatch *pLock = &pSdb->locks[pRow->type];
taosRLockLatch(pLock); taosWLockLatch(pLock);
int32_t ref = atomic_sub_fetch_32(&pRow->refCount, 1); int32_t ref = atomic_sub_fetch_32(&pRow->refCount, 1);
sdbPrintOper(pSdb, pRow, "release"); sdbPrintOper(pSdb, pRow, "release");
...@@ -342,7 +342,7 @@ void sdbRelease(SSdb *pSdb, void *pObj) { ...@@ -342,7 +342,7 @@ void sdbRelease(SSdb *pSdb, void *pObj) {
sdbFreeRow(pSdb, pRow, true); sdbFreeRow(pSdb, pRow, true);
} }
taosRUnLockLatch(pLock); taosWUnLockLatch(pLock);
} }
void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) { void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) {
...@@ -354,16 +354,6 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) { ...@@ -354,16 +354,6 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) {
SRWLatch *pLock = &pSdb->locks[type]; SRWLatch *pLock = &pSdb->locks[type];
taosRLockLatch(pLock); taosRLockLatch(pLock);
#if 0
if (pIter != NULL) {
SSdbRow *pLastRow = *(SSdbRow **)pIter;
int32_t ref = atomic_load_32(&pLastRow->refCount);
if (ref <= 0 && pLastRow->status == SDB_STATUS_DROPPED) {
sdbFreeRow(pSdb, pLastRow);
}
}
#endif
SSdbRow **ppRow = taosHashIterate(hash, pIter); SSdbRow **ppRow = taosHashIterate(hash, pIter);
while (ppRow != NULL) { while (ppRow != NULL) {
SSdbRow *pRow = *ppRow; SSdbRow *pRow = *ppRow;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册