提交 2586095f 编写于 作者: S Shengliang Guan

[TD-705] remove table lock in sdb

上级 4ac333b3
...@@ -63,7 +63,6 @@ typedef struct _SSdbTable { ...@@ -63,7 +63,6 @@ typedef struct _SSdbTable {
int32_t (*encodeFp)(SSdbOper *pOper); int32_t (*encodeFp)(SSdbOper *pOper);
int32_t (*destroyFp)(SSdbOper *pOper); int32_t (*destroyFp)(SSdbOper *pOper);
int32_t (*restoredFp)(); int32_t (*restoredFp)();
pthread_mutex_t mutex;
} SSdbTable; } SSdbTable;
typedef struct { typedef struct {
...@@ -429,24 +428,18 @@ static SSdbRow *sdbGetRowMetaFromObj(SSdbTable *pTable, void *key) { ...@@ -429,24 +428,18 @@ static SSdbRow *sdbGetRowMetaFromObj(SSdbTable *pTable, void *key) {
void *sdbGetRow(void *handle, void *key) { void *sdbGetRow(void *handle, void *key) {
SSdbTable *pTable = (SSdbTable *)handle; SSdbTable *pTable = (SSdbTable *)handle;
SSdbRow * pMeta;
if (handle == NULL) return NULL;
pthread_mutex_lock(&pTable->mutex);
int32_t keySize = sizeof(int32_t); int32_t keySize = sizeof(int32_t);
if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) { if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) {
keySize = strlen((char *)key); keySize = strlen((char *)key);
} }
pMeta = taosHashGet(pTable->iHandle, key, keySize);
SSdbRow *pMeta = taosHashGet(pTable->iHandle, key, keySize);
if (pMeta) sdbIncRef(pTable, pMeta->row); if (pMeta) {
pthread_mutex_unlock(&pTable->mutex); sdbIncRef(pTable, pMeta->row);
return pMeta->row;
if (pMeta == NULL) return NULL; } else {
return NULL;
return pMeta->row; }
} }
static void *sdbGetRowFromObj(SSdbTable *pTable, void *key) { static void *sdbGetRowFromObj(SSdbTable *pTable, void *key) {
...@@ -458,8 +451,6 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) { ...@@ -458,8 +451,6 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) {
rowMeta.rowSize = pOper->rowSize; rowMeta.rowSize = pOper->rowSize;
rowMeta.row = pOper->pObj; rowMeta.row = pOper->pObj;
pthread_mutex_lock(&pTable->mutex);
void * key = sdbGetObjKey(pTable, pOper->pObj); void * key = sdbGetObjKey(pTable, pOper->pObj);
int32_t keySize = sizeof(int32_t); int32_t keySize = sizeof(int32_t);
...@@ -470,16 +461,12 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) { ...@@ -470,16 +461,12 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) {
taosHashPut(pTable->iHandle, key, keySize, &rowMeta, sizeof(SSdbRow)); taosHashPut(pTable->iHandle, key, keySize, &rowMeta, sizeof(SSdbRow));
sdbIncRef(pTable, pOper->pObj); sdbIncRef(pTable, pOper->pObj);
pTable->numOfRows++; atomic_add_fetch_32(&pTable->numOfRows, 1);
if (pTable->keyType == SDB_KEY_AUTO) { if (pTable->tableId == SDB_TABLE_ACCOUNT) {
pTable->autoIndex = MAX(pTable->autoIndex, *((uint32_t *)pOper->pObj)); atomic_add_fetch_32(&pTable->autoIndex, 1);
} else {
pTable->autoIndex++;
} }
pthread_mutex_unlock(&pTable->mutex);
sdbTrace("table:%s, insert record:%s to hash, rowSize:%d numOfRows:%" PRId64 " version:%" PRIu64, pTable->tableName, sdbTrace("table:%s, insert record:%s to hash, rowSize:%d numOfRows:%" PRId64 " version:%" PRIu64, pTable->tableName,
sdbGetKeyStrFromObj(pTable, pOper->pObj), pOper->rowSize, pTable->numOfRows, sdbGetVersion()); sdbGetKeyStrFromObj(pTable, pOper->pObj), pOper->rowSize, pTable->numOfRows, sdbGetVersion());
...@@ -490,20 +477,15 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) { ...@@ -490,20 +477,15 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) {
static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbOper *pOper) { static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbOper *pOper) {
(*pTable->deleteFp)(pOper); (*pTable->deleteFp)(pOper);
pthread_mutex_lock(&pTable->mutex);
void * key = sdbGetObjKey(pTable, pOper->pObj); void * key = sdbGetObjKey(pTable, pOper->pObj);
int32_t keySize = sizeof(int32_t); int32_t keySize = sizeof(int32_t);
if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) { if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) {
keySize = strlen((char *)key); keySize = strlen((char *)key);
} }
taosHashRemove(pTable->iHandle, key, keySize); taosHashRemove(pTable->iHandle, key, keySize);
atomic_sub_fetch_32(&pTable->numOfRows, 1);
pTable->numOfRows--;
pthread_mutex_unlock(&pTable->mutex);
sdbTrace("table:%s, delete record:%s from hash, numOfRows:%" PRId64 " version:%" PRIu64, pTable->tableName, sdbTrace("table:%s, delete record:%s from hash, numOfRows:%" PRId64 " version:%" PRIu64, pTable->tableName,
sdbGetKeyStrFromObj(pTable, pOper->pObj), pTable->numOfRows, sdbGetVersion()); sdbGetKeyStrFromObj(pTable, pOper->pObj), pTable->numOfRows, sdbGetVersion());
...@@ -608,14 +590,12 @@ int32_t sdbInsertRow(SSdbOper *pOper) { ...@@ -608,14 +590,12 @@ int32_t sdbInsertRow(SSdbOper *pOper) {
} }
if (pTable->keyType == SDB_KEY_AUTO) { if (pTable->keyType == SDB_KEY_AUTO) {
pthread_mutex_lock(&pTable->mutex); *((uint32_t *)pOper->pObj) = atomic_add_fetch_32(&pTable->autoIndex, 1);
*((uint32_t *)pOper->pObj) = ++pTable->autoIndex;
// let vgId increase from 2 // let vgId increase from 2
if (pTable->autoIndex == 1 && strcmp(pTable->tableName, "vgroups") == 0) { if (pTable->autoIndex == 1 && strcmp(pTable->tableName, "vgroups") == 0) {
*((uint32_t *)pOper->pObj) = ++pTable->autoIndex; *((uint32_t *)pOper->pObj) = atomic_add_fetch_32(&pTable->autoIndex, 1);
} }
pthread_mutex_unlock(&pTable->mutex);
} }
int32_t code = sdbInsertHash(pTable, pOper); int32_t code = sdbInsertHash(pTable, pOper);
...@@ -805,8 +785,6 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) { ...@@ -805,8 +785,6 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) {
} }
pTable->iHandle = taosHashInit(pTable->hashSessions, hashFp, true); pTable->iHandle = taosHashInit(pTable->hashSessions, hashFp, true);
pthread_mutex_init(&pTable->mutex, NULL);
tsSdbObj.numOfTables++; tsSdbObj.numOfTables++;
tsSdbObj.tableList[pTable->tableId] = pTable; tsSdbObj.tableList[pTable->tableId] = pTable;
return pTable; return pTable;
...@@ -835,8 +813,6 @@ void sdbCloseTable(void *handle) { ...@@ -835,8 +813,6 @@ void sdbCloseTable(void *handle) {
taosHashDestroyIter(pIter); taosHashDestroyIter(pIter);
taosHashCleanup(pTable->iHandle); taosHashCleanup(pTable->iHandle);
pthread_mutex_destroy(&pTable->mutex);
sdbTrace("table:%s, is closed, numOfTables:%d", pTable->tableName, tsSdbObj.numOfTables); sdbTrace("table:%s, is closed, numOfTables:%d", pTable->tableName, tsSdbObj.numOfTables);
free(pTable); free(pTable);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册