提交 ce4a2802 编写于 作者: H Haojun Liao

[td-225] update the hash func

上级 b52bae71
......@@ -1039,7 +1039,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
}
if (NULL == pCmd->pTableList) {
pCmd->pTableList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
pCmd->pTableList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
pCmd->pDataBlocks = taosArrayInit(4, POINTER_BYTES);
if (NULL == pCmd->pTableList || NULL == pSql->cmd.pDataBlocks) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
......
......@@ -646,7 +646,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
STableDataBlocks* pOneTableBlock = taosArrayGetP(pTableDataBlockList, 0);
int32_t expandSize = getRowExpandSize(pOneTableBlock->pTableMeta);
void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES);
size_t total = taosArrayGetSize(pTableDataBlockList);
......
......@@ -814,7 +814,7 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) {
if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) {
hashFp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
}
pTable->iHandle = taosHashInit(pTable->hashSessions, hashFp, true);
pTable->iHandle = taosHashInit(pTable->hashSessions, hashFp, true, true);
tsSdbObj.numOfTables++;
tsSdbObj.tableList[pTable->tableId] = pTable;
......
......@@ -378,7 +378,7 @@ static void mnodeAddTableIntoStable(SSuperTableObj *pStable, SChildTableObj *pCt
atomic_add_fetch_32(&pStable->numOfTables, 1);
if (pStable->vgHash == NULL) {
pStable->vgHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false);
pStable->vgHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
}
if (pStable->vgHash != NULL) {
......
......@@ -5841,7 +5841,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
pQInfo->tableqinfoGroupInfo.pGroupList = taosArrayInit(numOfGroups, POINTER_BYTES);
pQInfo->tableqinfoGroupInfo.numOfTables = pTableGroupInfo->numOfTables;
pQInfo->tableqinfoGroupInfo.map = taosHashInit(pTableGroupInfo->numOfTables,
taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false);
taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
}
int tableIndex = 0;
......
......@@ -34,9 +34,9 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t ro
pResBuf->lruList = tdListNew(POINTER_BYTES);
// init id hash table
pResBuf->groupSet = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false);
pResBuf->groupSet = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
pResBuf->assistBuf = malloc(pResBuf->pageSize + 2); // EXTRA BYTES
pResBuf->all = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false);
pResBuf->all = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
char path[PATH_MAX] = {0};
getTmpfilePath("qbuf", path);
......
......@@ -256,7 +256,7 @@ static void* KeywordHashTable = NULL;
static void doInitKeywordsTable() {
int numOfEntries = tListLen(keywordTable);
KeywordHashTable = taosHashInit(numOfEntries, MurmurHash3_32, false);
KeywordHashTable = taosHashInit(numOfEntries, MurmurHash3_32, true, false);
for (int32_t i = 0; i < numOfEntries; i++) {
keywordTable[i].len = strlen(keywordTable[i].name);
void* ptr = &keywordTable[i];
......
......@@ -40,7 +40,7 @@ int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRun
pWindowResInfo->type = type;
_hash_fn_t fn = taosGetDefaultHashFunction(type);
pWindowResInfo->hashList = taosHashInit(threshold, fn, false);
pWindowResInfo->hashList = taosHashInit(threshold, fn, true, false);
if (pWindowResInfo->hashList == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
......@@ -107,7 +107,7 @@ void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowR
pWindowResInfo->size = 0;
_hash_fn_t fn = taosGetDefaultHashFunction(pWindowResInfo->type);
pWindowResInfo->hashList = taosHashInit(pWindowResInfo->capacity, fn, false);
pWindowResInfo->hashList = taosHashInit(pWindowResInfo->capacity, fn, true, false);
pWindowResInfo->startTime = TSKEY_INITIAL_VAL;
pWindowResInfo->prevSKey = TSKEY_INITIAL_VAL;
......
......@@ -261,7 +261,7 @@ void *rpcOpen(const SRpcInit *pInit) {
}
if (pRpc->connType == TAOS_CONN_SERVER) {
pRpc->hash = taosHashInit(pRpc->sessions, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true);
pRpc->hash = taosHashInit(pRpc->sessions, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, true);
if (pRpc->hash == NULL) {
tError("%s failed to init string hash", pRpc->label);
rpcClose(pRpc);
......
......@@ -96,7 +96,7 @@ static void syncModuleInitFunc() {
return;
}
vgIdHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true);
vgIdHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, true);
if (vgIdHash == NULL) {
taosTmrCleanUp(syncTmrCtrl);
taosCloseTcpThreadPool(tsTcpPool);
......
......@@ -436,7 +436,7 @@ STsdbMeta *tsdbNewMeta(STsdbCfg *pCfg) {
goto _err;
}
pMeta->uidMap = taosHashInit(TSDB_INIT_NTABLES * 1.1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
pMeta->uidMap = taosHashInit(TSDB_INIT_NTABLES * 1.1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
if (pMeta->uidMap == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
......
......@@ -44,6 +44,7 @@ typedef struct SHashObj {
_hash_fn_t hashFp; // hash function
_hash_free_fn_t freeFp; // hash node free callback function
bool enableUpdate; // enable update
#if defined(LINUX)
pthread_rwlock_t *lock;
#else
......@@ -67,7 +68,7 @@ typedef struct SHashMutableIterator {
* @param threadsafe thread safe or not
* @return
*/
SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool threadsafe);
SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, bool threadsafe);
/**
* return the size of hash table
......@@ -105,7 +106,8 @@ void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen);
*/
int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen);
void taosHashRemoveNode();
void* taosHashRemoveNode(SHashObj *pHashObj, const void *key, size_t keyLen);
/**
* clean up hash table
......
......@@ -163,7 +163,7 @@ static void doAddToHashTable(SHashObj *pHashObj, SHashNode *pNode);
*/
static SHashNode *getNextHashNode(SHashMutableIterator *pIter);
SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool threadsafe) {
SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, bool threadsafe) {
if (capacity == 0 || fn == NULL) {
return NULL;
}
......@@ -179,6 +179,7 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool threadsafe) {
assert((pHashObj->capacity & (pHashObj->capacity - 1)) == 0);
pHashObj->hashFp = fn;
pHashObj->enableUpdate = update;
pHashObj->hashList = (SHashNode **)calloc(pHashObj->capacity, POINTER_BYTES);
if (pHashObj->hashList == NULL) {
......@@ -232,15 +233,21 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
doAddToHashTable(pHashObj, pNewNode);
__unlock(pHashObj->lock);
return 0;
} else {
doUpdateHashNode(pNode, pNewNode);
// not support the update operation, return error
if (pHashObj->enableUpdate) {
doUpdateHashNode(pNode, pNewNode);
}
__unlock(pHashObj->lock);
tfree(pNewNode->data)
tfree(pNewNode);
}
return 0;
return pHashObj->enableUpdate? 0:-1;
}
}
void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen) {
......@@ -288,10 +295,45 @@ int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen) {
pNode->next = NULL;
pNode->prev = NULL;
tfree(pNode->data);
tfree(pNode);
return 0;
}
void* taosHashRemoveNode(SHashObj *pHashObj, const void *key, size_t keyLen) {
uint32_t hashVal = (*pHashObj->hashFp)(key, keyLen);
__wr_lock(pHashObj->lock);
SHashNode *pNode = doGetNodeFromHashTable(pHashObj, key, keyLen, hashVal);
if (pNode == NULL) {
__unlock(pHashObj->lock);
return NULL;
}
SHashNode *pNext = pNode->next;
if (pNode->prev == NULL) {
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
assert(pHashObj->hashList[slot] == pNode);
pHashObj->hashList[slot] = pNext;
} else {
pNode->prev->next = pNext;
}
if (pNext != NULL) {
pNext->prev = pNode->prev;
}
pHashObj->size -= 1;
__unlock(pHashObj->lock);
pNode->next = NULL;
pNode->prev = NULL;
return pNode;
}
void taosHashCleanup(SHashObj *pHashObj) {
if (pHashObj == NULL) return;
......
......@@ -207,7 +207,7 @@ SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool ext
return NULL;
}
pCacheObj->pHashTable = taosHashInit(128, taosGetDefaultHashFunction(keyType), true);
pCacheObj->pHashTable = taosHashInit(128, taosGetDefaultHashFunction(keyType), false, true);
pCacheObj->name = strdup(cacheName);
if (pCacheObj->pHashTable == NULL) {
free(pCacheObj);
......@@ -249,45 +249,49 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v
return NULL;
}
__cache_wr_lock(pCacheObj);
SCacheDataNode **pt = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen);
SCacheDataNode * pOld = (pt != NULL) ? (*pt) : NULL;
if (pOld == NULL) { // do addedTime to cache
T_REF_INC(pNode1);
taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNode1, sizeof(void *));
__cache_unlock(pCacheObj);
// __cache_wr_lock(pCacheObj);
T_REF_INC(pNode1);
int32_t succ = taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNode1, sizeof(void *));
if (succ == 0) {
atomic_add_fetch_64(&pCacheObj->totalSize, pNode1->size);
uDebug("cache:%s, key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64 ", totalNum:%d totalSize:%" PRId64
"bytes size:%" PRId64 "bytes",
uDebug("cache:%s, key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64
", totalNum:%d totalSize:%" PRId64 "bytes size:%" PRId64 "bytes",
pCacheObj->name, key, pNode1->data, pNode1->addedTime, pNode1->expireTime,
(int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, (int64_t)dataSize);
} else { // old data exists, update the node
bool addToTrashcan = false;
if (T_REF_VAL_GET(pOld) > 0) {
} else { // duplicated key exists
while (1) {
// todo removed by node, instead of by key
int32_t succ = taosHashRemove(pCacheObj->pHashTable, pOld->key, pOld->keySize);
assert(succ == 0);
SHashNode *p = taosHashRemoveNode(pCacheObj->pHashTable, key, keyLen);
// add to trashcan
if (p != NULL) {
SCacheDataNode* pCachedNode = *(SCacheDataNode**)p->data;
if (T_REF_VAL_GET(pCachedNode) == 0) {
tfree(pCachedNode);
} else {
taosAddToTrash(pCacheObj, pCachedNode);
uDebug("cache:%s, key:%p, %p exist in cache, updated old:%p", pCacheObj->name, key, pNode1->data, pCachedNode);
}
}
addToTrashcan = true;
}
assert(T_REF_VAL_GET(pNode1) == 1);
T_REF_INC(pNode1);
int32_t ret = taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNode1, sizeof(void *));
if (ret == 0) {
atomic_add_fetch_64(&pCacheObj->totalSize, pNode1->size);
// addedTime new element to hashtable
taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNode1, sizeof(void *));
__cache_unlock(pCacheObj);
uDebug("cache:%s, key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64
", totalNum:%d totalSize:%" PRId64 "bytes size:%" PRId64 "bytes",
pCacheObj->name, key, pNode1->data, pNode1->addedTime, pNode1->expireTime,
(int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, (int64_t)dataSize);
// todo add trashcan lock
if (addToTrashcan) {
taosAddToTrash(pCacheObj, pOld);
} else {
free(pOld);
}
return pNode1;
uDebug("cache:%s, key:%p, %p exist in cache, updated old:%p", pCacheObj->name, key, pNode1->data, pOld);
} else {
// failed, try again
}
}
}
return pNode1->data;
......@@ -415,8 +419,6 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
}
if (_remove) {
// __cache_wr_lock(pCacheObj);
// NOTE: once refcount is decrease, pNode may be freed by other thread immediately.
int32_t ref = T_REF_DEC(pNode);
uDebug("cache:%s, key:%p, %p is released, refcnt:%d", pCacheObj->name, pNode->key, pNode->data, ref);
......@@ -429,34 +431,27 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
* that tries to do the same thing.
*/
if (pNode->inTrashCan) {
// __cache_unlock(pCacheObj);
if (ref == 0) {
assert(pNode->pTNodeHeader->pData == pNode);
taosRemoveFromTrashCan(pCacheObj, pNode->pTNodeHeader);
}
} else {
int32_t success = taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize);
if (success) {
if (ref > 0) {
assert(pNode->pTNodeHeader == NULL);
if (ref > 0) {
assert(pNode->pTNodeHeader == NULL);
// todo trashcan lock
if (success) {
// todo trashcan lock
taosAddToTrash(pCacheObj, pNode);
} else { // ref == 0
atomic_fetch_sub_ptr(&pCacheObj->totalSize, pNode->size);
uDebug("cache:%s, key:%p, %p is destroyed from cache, totalNum:%d totalSize:%" PRId64 "bytes size:%dbytes",
pCacheObj->name, pNode->key, pNode->data, (int32_t)taosHashGetSize(pCacheObj->pHashTable),
pCacheObj->totalSize, pNode->size);
if (pCacheObj->freeFp) pCacheObj->freeFp(pNode->data);
free(pNode);
}
// __cache_unlock(pCacheObj);
} else {
// __cache_unlock(pCacheObj);
// taosCacheReleaseNode(pCacheObj, pNode);
atomic_fetch_sub_ptr(&pCacheObj->totalSize, pNode->size);
uDebug("cache:%s, key:%p, %p is destroyed from cache, totalNum:%d totalSize:%" PRId64 "bytes size:%dbytes",
pCacheObj->name, pNode->key, pNode->data, (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize,
pNode->size);
if (pCacheObj->freeFp) pCacheObj->freeFp(pNode->data);
free(pNode);
}
}
......
......@@ -419,7 +419,7 @@ static SKVStore *tdNewKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void
pStore->iFunc = iFunc;
pStore->aFunc = aFunc;
pStore->appH = appH;
pStore->map = taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
pStore->map = taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
if (pStore->map == NULL) {
terrno = TSDB_CODE_COM_OUT_OF_MEMORY;
goto _err;
......
......@@ -50,7 +50,7 @@ int32_t vnodeInitResources() {
vnodeInitWriteFp();
vnodeInitReadFp();
tsDnodeVnodesHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true);
tsDnodeVnodesHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, true);
if (tsDnodeVnodesHash == NULL) {
vError("failed to init vnode list");
return TSDB_CODE_VND_OUT_OF_MEMORY;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册