提交 93260417 编写于 作者: S Shengliang Guan

[TD-149] change hash imp in sdb

上级 b3a1070b
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "hash.h"
#include "trpc.h" #include "trpc.h"
#include "tutil.h" #include "tutil.h"
#include "tbalance.h" #include "tbalance.h"
...@@ -83,12 +84,6 @@ typedef struct { ...@@ -83,12 +84,6 @@ typedef struct {
} SSdbRow; } SSdbRow;
static SSdbObject tsSdbObj = {0}; static SSdbObject tsSdbObj = {0};
static void *(*sdbInitIndexFp[])(int32_t maxRows, int32_t dataSize) = {sdbOpenStrHash, sdbOpenIntHash, sdbOpenIntHash};
static void *(*sdbAddIndexFp[])(void *handle, void *key, void *data) = {sdbAddStrHash, sdbAddIntHash, sdbAddIntHash};
static void (*sdbDeleteIndexFp[])(void *handle, void *key) = {sdbDeleteStrHash, sdbDeleteIntHash, sdbDeleteIntHash};
static void *(*sdbGetIndexFp[])(void *handle, void *key) = {sdbGetStrHashData, sdbGetIntHashData, sdbGetIntHashData};
static void (*sdbCleanUpIndexFp[])(void *handle) = {sdbCloseStrHash, sdbCloseIntHash, sdbCloseIntHash};
static void *(*sdbFetchRowFp[])(void *handle, void *ptr, void **ppRow) = {sdbFetchStrHashData, sdbFetchIntHashData, sdbFetchIntHashData};
static int sdbWrite(void *param, void *data, int type); static int sdbWrite(void *param, void *data, int type);
int32_t sdbGetId(void *handle) { int32_t sdbGetId(void *handle) {
...@@ -375,7 +370,11 @@ static SSdbRow *sdbGetRowMeta(void *handle, void *key) { ...@@ -375,7 +370,11 @@ static SSdbRow *sdbGetRowMeta(void *handle, void *key) {
if (handle == NULL) return NULL; if (handle == NULL) return NULL;
pMeta = (*sdbGetIndexFp[pTable->keyType])(pTable->iHandle, key); int32_t keySize = sizeof(int32_t);
if (pTable->keyType == SDB_KEY_STRING) {
keySize = strlen((char *)key);
}
pMeta = taosHashGet(pTable->iHandle, key, keySize);
return pMeta; return pMeta;
} }
...@@ -387,13 +386,17 @@ void *sdbGetRow(void *handle, void *key) { ...@@ -387,13 +386,17 @@ void *sdbGetRow(void *handle, void *key) {
if (handle == NULL) return NULL; if (handle == NULL) return NULL;
pthread_mutex_lock(&pTable->mutex); pthread_mutex_lock(&pTable->mutex);
pMeta = (*sdbGetIndexFp[pTable->keyType])(pTable->iHandle, key);
int32_t keySize = sizeof(int32_t);
if (pTable->keyType == SDB_KEY_STRING) {
keySize = strlen((char *)key);
}
pMeta = taosHashGet(pTable->iHandle, key, keySize);
if (pMeta) sdbIncRef(pTable, pMeta->row); if (pMeta) sdbIncRef(pTable, pMeta->row);
pthread_mutex_unlock(&pTable->mutex); pthread_mutex_unlock(&pTable->mutex);
if (pMeta == NULL) { if (pMeta == NULL) return NULL;
return NULL;
}
return pMeta->row; return pMeta->row;
} }
...@@ -404,7 +407,13 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) { ...@@ -404,7 +407,13 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) {
rowMeta.row = pOper->pObj; rowMeta.row = pOper->pObj;
pthread_mutex_lock(&pTable->mutex); pthread_mutex_lock(&pTable->mutex);
(*sdbAddIndexFp[pTable->keyType])(pTable->iHandle, pOper->pObj, &rowMeta);
int32_t keySize = sizeof(int32_t);
if (pTable->keyType == SDB_KEY_STRING) {
keySize = strlen((char *)pOper->pObj);
}
taosHashPut(pTable->iHandle, pOper->pObj, keySize, &rowMeta, sizeof(SSdbRow));
sdbIncRef(pTable, pOper->pObj); sdbIncRef(pTable, pOper->pObj);
pTable->numOfRows++; pTable->numOfRows++;
...@@ -427,7 +436,13 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbOper *pOper) { ...@@ -427,7 +436,13 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbOper *pOper) {
(*pTable->deleteFp)(pOper); (*pTable->deleteFp)(pOper);
pthread_mutex_lock(&pTable->mutex); pthread_mutex_lock(&pTable->mutex);
(*sdbDeleteIndexFp[pTable->keyType])(pTable->iHandle, pOper->pObj);
int32_t keySize = sizeof(int32_t);
if (pTable->keyType == SDB_KEY_STRING) {
keySize = strlen((char *)pOper->pObj);
}
taosHashRemove(pTable->iHandle, pOper->pObj, keySize);
pTable->numOfRows--; pTable->numOfRows--;
pthread_mutex_unlock(&pTable->mutex); pthread_mutex_unlock(&pTable->mutex);
...@@ -637,18 +652,26 @@ int32_t sdbUpdateRow(SSdbOper *pOper) { ...@@ -637,18 +652,26 @@ int32_t sdbUpdateRow(SSdbOper *pOper) {
void *sdbFetchRow(void *handle, void *pNode, void **ppRow) { void *sdbFetchRow(void *handle, void *pNode, void **ppRow) {
SSdbTable *pTable = (SSdbTable *)handle; SSdbTable *pTable = (SSdbTable *)handle;
SSdbRow * pMeta;
*ppRow = NULL; *ppRow = NULL;
if (pTable == NULL) return NULL; if (pTable == NULL) return NULL;
pNode = (*sdbFetchRowFp[pTable->keyType])(pTable->iHandle, pNode, (void **)&pMeta); SHashMutableIterator *pIter = pNode;
if (pIter == NULL) {
pIter = taosHashCreateIter(pTable->iHandle);
}
if (!taosHashIterNext(pIter)) {
taosHashDestroyIter(pIter);
return NULL;
}
SSdbRow *pMeta = taosHashIterGet(pIter);
if (pMeta == NULL) return NULL; if (pMeta == NULL) return NULL;
*ppRow = pMeta->row; *ppRow = pMeta->row;
sdbIncRef(handle, pMeta->row); sdbIncRef(handle, pMeta->row);
return pNode; return pIter;
} }
void *sdbOpenTable(SSdbTableDesc *pDesc) { void *sdbOpenTable(SSdbTableDesc *pDesc) {
...@@ -669,10 +692,12 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) { ...@@ -669,10 +692,12 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) {
pTable->decodeFp = pDesc->decodeFp; pTable->decodeFp = pDesc->decodeFp;
pTable->destroyFp = pDesc->destroyFp; pTable->destroyFp = pDesc->destroyFp;
pTable->restoredFp = pDesc->restoredFp; pTable->restoredFp = pDesc->restoredFp;
if (sdbInitIndexFp[pTable->keyType] != NULL) { _hash_fn_t hashFp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT);
pTable->iHandle = (*sdbInitIndexFp[pTable->keyType])(pTable->maxRowSize, sizeof(SSdbRow)); if (pTable->keyType == SDB_KEY_STRING) {
hashFp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
} }
pTable->iHandle = taosHashInit(pTable->hashSessions, hashFp, true);
pthread_mutex_init(&pTable->mutex, NULL); pthread_mutex_init(&pTable->mutex, NULL);
...@@ -688,11 +713,10 @@ void sdbCloseTable(void *handle) { ...@@ -688,11 +713,10 @@ void sdbCloseTable(void *handle) {
tsSdbObj.numOfTables--; tsSdbObj.numOfTables--;
tsSdbObj.tableList[pTable->tableId] = NULL; tsSdbObj.tableList[pTable->tableId] = NULL;
void *pNode = NULL; SHashMutableIterator *pIter = taosHashCreateIter(pTable->iHandle);
while (1) { while (taosHashIterNext(pIter)) {
SSdbRow *pMeta; SSdbRow *pMeta = taosHashIterGet(pIter);
pNode = (*sdbFetchRowFp[pTable->keyType])(pTable->iHandle, pNode, (void **)&pMeta); if (pMeta == NULL) continue;
if (pMeta == NULL) break;
SSdbOper oper = { SSdbOper oper = {
.pObj = pMeta->row, .pObj = pMeta->row,
...@@ -702,9 +726,8 @@ void sdbCloseTable(void *handle) { ...@@ -702,9 +726,8 @@ void sdbCloseTable(void *handle) {
(*pTable->destroyFp)(&oper); (*pTable->destroyFp)(&oper);
} }
if (sdbCleanUpIndexFp[pTable->keyType]) { taosHashDestroyIter(pIter);
(*sdbCleanUpIndexFp[pTable->keyType])(pTable->iHandle); taosHashCleanup(pTable->iHandle);
}
pthread_mutex_destroy(&pTable->mutex); pthread_mutex_destroy(&pTable->mutex);
......
...@@ -812,6 +812,7 @@ static void mgmtProcessDropSuperTableMsg(SQueuedMsg *pMsg) { ...@@ -812,6 +812,7 @@ static void mgmtProcessDropSuperTableMsg(SQueuedMsg *pMsg) {
dnodeSendMsgToDnode(&ipSet, &rpcMsg); dnodeSendMsgToDnode(&ipSet, &rpcMsg);
mgmtDecVgroupRef(pVgroup); mgmtDecVgroupRef(pVgroup);
} }
taosHashDestroyIter(pIter);
mgmtDropAllChildTablesInStable(pStable); mgmtDropAllChildTablesInStable(pStable);
} }
...@@ -1288,6 +1289,8 @@ static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *pMsg) { ...@@ -1288,6 +1289,8 @@ static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *pMsg) {
mgmtDecVgroupRef(pVgroup); mgmtDecVgroupRef(pVgroup);
} }
taosHashDestroyIter(pIter);
pVgroupInfo->numOfVgroups = htonl(vgSize); pVgroupInfo->numOfVgroups = htonl(vgSize);
// one table is done, try the next table // one table is done, try the next table
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册