提交 c2ce0fc4 编写于 作者: H Haojun Liao

[td-225] refactor codes

上级 ae244c9e
......@@ -30,15 +30,11 @@ typedef void (*_hash_free_fn_t)(void *param);
typedef struct SHashNode {
char *key;
// union {
struct SHashNode * prev;
// struct SHashEntry *prev1;
// };
//
struct SHashNode *prev;
struct SHashNode *next;
uint32_t hashVal; // the hash value of key, if hashVal == HASH_VALUE_IN_TRASH, this node is moved to trash
uint32_t keyLen; // length of the key
char data[];
char *data;
} SHashNode;
typedef struct SHashObj {
......@@ -109,6 +105,8 @@ void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen);
*/
void taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen);
void taosHashRemoveNode();
/**
* clean up hash table
* @param handle
......
......@@ -19,6 +19,8 @@
#include "tulog.h"
#include "tutil.h"
#define HASH_NEED_RESIZE(_h) ((_h)->size >= (_h)->capacity * HASH_DEFAULT_LOAD_FACTOR)
static FORCE_INLINE void __wr_lock(void *lock) {
if (lock == NULL) {
return;
......@@ -95,29 +97,19 @@ static FORCE_INLINE int32_t taosHashCapacity(int32_t length) {
* @param hashVal hash value by hash function
* @return
*/
FORCE_INLINE SHashNode *doGetNodeFromHashTable(SHashObj *pHashObj, const void *key, uint32_t keyLen, uint32_t *hashVal) {
uint32_t hash = (*pHashObj->hashFp)(key, keyLen);
FORCE_INLINE SHashNode *doGetNodeFromHashTable(SHashObj *pHashObj, const void *key, uint32_t keyLen, uint32_t hashVal) {
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
int32_t slot = HASH_INDEX(hash, pHashObj->capacity);
SHashNode *pNode = pHashObj->hashList[slot];
while (pNode) {
if ((pNode->keyLen == keyLen) && (memcmp(pNode->key, key, keyLen) == 0)) {
assert(pNode->hashVal == hashVal);
break;
}
pNode = pNode->next;
}
if (pNode) {
assert(HASH_INDEX(pNode->hashVal, pHashObj->capacity) == slot);
}
// return the calculated hash value, to avoid calculating it again in other functions
if (hashVal != NULL) {
*hashVal = hash;
}
return pNode;
}
......@@ -148,7 +140,13 @@ static SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *p
* @param dsize size of actual data
* @return hash node
*/
static SHashNode *doUpdateHashNode(SHashNode *pNode, const void *key, size_t keyLen, const void *pData, size_t dsize);
static FORCE_INLINE SHashNode *doUpdateHashNode(SHashNode *pNode, SHashNode *pNewNode) {
assert(pNode->keyLen == pNewNode->keyLen);
SWAP(pNode->key, pNewNode->key, void*);
SWAP(pNode->data, pNewNode->data, void*);
return pNewNode;
}
/**
* insert the hash node at the front of the linked list
......@@ -217,58 +215,43 @@ size_t taosHashGetSize(const SHashObj *pHashObj) {
}
int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size) {
__wr_lock(pHashObj->lock);
uint32_t hashVal = (*pHashObj->hashFp)(key, keyLen);
SHashNode *pNewNode = doCreateHashNode(key, keyLen, data, size, hashVal);
if (pNewNode == NULL) {
return -1;
}
uint32_t hashVal = 0;
SHashNode *pNode = doGetNodeFromHashTable(pHashObj, key, keyLen, &hashVal);
__wr_lock(pHashObj->lock);
SHashNode *pNode = doGetNodeFromHashTable(pHashObj, key, keyLen, hashVal);
if (pNode == NULL) { // no data in hash table with the specified key, add it into hash table
taosHashTableResize(pHashObj);
SHashNode *pNewNode = doCreateHashNode(key, keyLen, data, size, hashVal);
if (pNewNode == NULL) {
__unlock(pHashObj->lock);
return -1;
if (HASH_NEED_RESIZE(pHashObj)) {
taosHashTableResize(pHashObj);
}
doAddToHashTable(pHashObj, pNewNode);
} else {
SHashNode *pNewNode = doUpdateHashNode(pNode, key, keyLen, data, size);
if (pNewNode == NULL) {
__unlock(pHashObj->lock);
return -1;
}
if (pNewNode->prev) {
pNewNode->prev->next = pNewNode;
} else {
int32_t slot = HASH_INDEX(pNewNode->hashVal, pHashObj->capacity);
assert(pHashObj->hashList[slot] == pNode);
pHashObj->hashList[slot] = pNewNode;
}
doUpdateHashNode(pNode, pNewNode);
__unlock(pHashObj->lock);
if (pNewNode->next) {
(pNewNode->next)->prev = pNewNode;
}
tfree(pNewNode->data)
tfree(pNewNode);
}
__unlock(pHashObj->lock);
return 0;
}
void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen) {
__rd_lock(pHashObj->lock);
uint32_t hashVal = 0;
SHashNode *pNode = doGetNodeFromHashTable(pHashObj, key, keyLen, &hashVal);
uint32_t hashVal = (*pHashObj->hashFp)(key, keyLen);
__rd_lock(pHashObj->lock);
SHashNode *pNode = doGetNodeFromHashTable(pHashObj, key, keyLen, hashVal);
__unlock(pHashObj->lock);
if (pNode != NULL) {
if (pNode) {
assert(pNode->hashVal == hashVal);
return pNode->data;
} else {
return NULL;
......@@ -276,10 +259,10 @@ void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen) {
}
void taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen) {
__wr_lock(pHashObj->lock);
uint32_t hashVal = (*pHashObj->hashFp)(key, keyLen);
uint32_t val = 0;
SHashNode *pNode = doGetNodeFromHashTable(pHashObj, key, keyLen, &val);
__wr_lock(pHashObj->lock);
SHashNode *pNode = doGetNodeFromHashTable(pHashObj, key, keyLen, hashVal);
if (pNode == NULL) {
__unlock(pHashObj->lock);
return;
......@@ -287,7 +270,7 @@ void taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen) {
SHashNode *pNext = pNode->next;
if (pNode->prev == NULL) {
int32_t slot = HASH_INDEX(val, pHashObj->capacity);
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
assert(pHashObj->hashList[slot] == pNode);
pHashObj->hashList[slot] = pNext;
......@@ -299,13 +282,13 @@ void taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen) {
pNext->prev = pNode->prev;
}
pHashObj->size--;
pHashObj->size -= 1;
__unlock(pHashObj->lock);
pNode->next = NULL;
pNode->prev = NULL;
tfree(pNode);
__unlock(pHashObj->lock);
}
void taosHashCleanup(SHashObj *pHashObj) {
......@@ -341,14 +324,6 @@ void taosHashCleanup(SHashObj *pHashObj) {
free(pHashObj);
}
void taosHashSetFreecb(SHashObj *pHashObj, _hash_free_fn_t freeFp) {
if (pHashObj == NULL || freeFp == NULL) {
return;
}
pHashObj->freeFp = freeFp;
}
SHashMutableIterator *taosHashCreateIter(SHashObj *pHashObj) {
SHashMutableIterator *pIter = calloc(1, sizeof(SHashMutableIterator));
if (pIter == NULL) {
......@@ -528,41 +503,23 @@ void taosHashTableResize(SHashObj *pHashObj) {
}
SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, size_t dsize, uint32_t hashVal) {
size_t totalSize = dsize + sizeof(SHashNode) + keyLen;
SHashNode *pNewNode = calloc(1, totalSize);
SHashNode *pNewNode = calloc(1, sizeof(SHashNode));
if (pNewNode == NULL) {
uError("failed to allocate memory, reason:%s", strerror(errno));
return NULL;
}
pNewNode->data = malloc(dsize + keyLen);
memcpy(pNewNode->data, pData, dsize);
pNewNode->key = pNewNode->data + dsize;
memcpy(pNewNode->key, key, keyLen);
pNewNode->keyLen = keyLen;
pNewNode->keyLen = keyLen;
pNewNode->hashVal = hashVal;
return pNewNode;
}
SHashNode *doUpdateHashNode(SHashNode *pNode, const void *key, size_t keyLen, const void *pData, size_t dsize) {
size_t size = dsize + sizeof(SHashNode) + keyLen;
SHashNode *pNewNode = (SHashNode *)realloc(pNode, size);
if (pNewNode == NULL) {
return NULL;
}
memcpy(pNewNode->data, pData, dsize);
pNewNode->key = pNewNode->data + dsize;
assert(memcmp(pNewNode->key, key, keyLen) == 0 && keyLen == pNewNode->keyLen);
memcpy(pNewNode->key, key, keyLen);
return pNewNode;
}
void doAddToHashTable(SHashObj *pHashObj, SHashNode *pNode) {
assert(pNode != NULL);
......
......@@ -147,49 +147,18 @@ static FORCE_INLINE void taosCacheMoveToTrash(SCacheObj *pCacheObj, SCacheDataNo
* @param dataSize
* @return
*/
static SCacheDataNode *taosUpdateCacheImpl(SCacheObj *pCacheObj, SCacheDataNode *pNode, const char *key, int32_t keyLen,
const void *pData, uint32_t dataSize, uint64_t duration) {
SCacheDataNode *pNewNode = NULL;
static UNUSED_FUNC SCacheDataNode *taosUpdateCacheImpl(SCacheObj *pCacheObj, SCacheDataNode* pNode, SCacheDataNode* pNewNode,
const char *key, int32_t keyLen) {
// only a node is not referenced by any other object, in-place update it
if (T_REF_VAL_GET(pNode) == 0) {
size_t newSize = sizeof(SCacheDataNode) + dataSize + keyLen + 1;
pNewNode = (SCacheDataNode *)realloc(pNode, newSize);
if (pNewNode == NULL) {
return NULL;
}
memset(pNewNode, 0, newSize);
pNewNode->signature = (uint64_t)pNewNode;
memcpy(pNewNode->data, pData, dataSize);
pNewNode->key = (char *)pNewNode + sizeof(SCacheDataNode) + dataSize;
pNewNode->keySize = keyLen;
memcpy(pNewNode->key, key, keyLen);
// update the timestamp information for updated key/value
pNewNode->addedTime = taosGetTimestampMs();
pNewNode->lifespan = duration;
T_REF_INC(pNewNode);
// the address of this node may be changed, so the prev and next element should update the corresponding pointer
taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNewNode, sizeof(void *));
} else {
if (T_REF_VAL_GET(pNode) > 0) {
taosCacheMoveToTrash(pCacheObj, pNode);
pNewNode = taosCreateCacheNode(key, keyLen, pData, dataSize, duration);
if (pNewNode == NULL) {
return NULL;
}
T_REF_INC(pNewNode);
// addedTime new element to hashtable
taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNewNode, sizeof(void *));
}
return pNewNode;
}
......@@ -238,7 +207,7 @@ SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool ext
return NULL;
}
pCacheObj->pHashTable = taosHashInit(128, taosGetDefaultHashFunction(keyType), false);
pCacheObj->pHashTable = taosHashInit(128, taosGetDefaultHashFunction(keyType), true);
pCacheObj->name = strdup(cacheName);
if (pCacheObj->pHashTable == NULL) {
free(pCacheObj);
......@@ -270,36 +239,59 @@ SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool ext
}
void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const void *pData, size_t dataSize, int duration) {
SCacheDataNode *pNode;
// SCacheDataNode *pNode = NULL;
if (pCacheObj == NULL || pCacheObj->pHashTable == NULL) {
return NULL;
}
SCacheDataNode *pNode1 = taosCreateCacheNode(key, keyLen, pData, dataSize, duration);
if (pNode1 == NULL) {
uError("cache:%s, key:%p, failed to added into cache, out of memory", pCacheObj->name, key);
return NULL;
}
__cache_wr_lock(pCacheObj);
SCacheDataNode **pt = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen);
SCacheDataNode * pOld = (pt != NULL) ? (*pt) : NULL;
if (pOld == NULL) { // do addedTime to cache
pNode = taosAddToCacheImpl(pCacheObj, key, keyLen, pData, dataSize, duration * 1000L);
if (NULL != pNode) {
pCacheObj->totalSize += pNode->size;
T_REF_INC(pNode1);
taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNode1, sizeof(void *));
__cache_unlock(pCacheObj);
atomic_add_fetch_64(&pCacheObj->totalSize, pNode1->size);
uDebug("cache:%s, key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64 ", totalNum:%d totalSize:%" PRId64
"bytes size:%" PRId64 "bytes",
pCacheObj->name, key, pNode->data, pNode->addedTime, pNode->expireTime,
pCacheObj->name, key, pNode1->data, pNode1->addedTime, pNode1->expireTime,
(int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, (int64_t)dataSize);
} else {
uError("cache:%s, key:%p, failed to added into cache, out of memory", pCacheObj->name, key);
}
} else { // old data exists, update the node
pNode = taosUpdateCacheImpl(pCacheObj, pOld, key, keyLen, pData, dataSize, duration * 1000L);
uDebug("cache:%s, key:%p, %p exist in cache, updated old:%p", pCacheObj->name, key, pNode->data, pOld);
bool addToTrashcan = false;
if (T_REF_VAL_GET(pOld) > 0) {
// todo removed by node, instead of by key
taosHashRemove(pCacheObj->pHashTable, pOld->key, pOld->keySize);
} else {
addToTrashcan = true;
}
T_REF_INC(pNode1);
// addedTime new element to hashtable
taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNode1, sizeof(void *));
__cache_unlock(pCacheObj);
return (pNode != NULL) ? pNode->data : NULL;
// todo add trashcan lock
if (addToTrashcan) {
taosAddToTrash(pCacheObj, pOld);
} else {
tfree(pOld);
}
uDebug("cache:%s, key:%p, %p exist in cache, updated old:%p", pCacheObj->name, key, pNode1->data, pOld);
}
return pNode1->data;
}
void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen) {
......@@ -310,7 +302,6 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen
void *pData = NULL;
__cache_rd_lock(pCacheObj);
SCacheDataNode **ptNode = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen);
int32_t ref = 0;
......@@ -439,21 +430,33 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
* that tries to do the same thing.
*/
if (pNode->inTrashCan) {
__cache_unlock(pCacheObj);
if (ref == 0) {
assert(pNode->pTNodeHeader->pData == pNode);
taosRemoveFromTrashCan(pCacheObj, pNode->pTNodeHeader);
}
} else {
taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize);
__cache_unlock(pCacheObj);
if (ref > 0) {
assert(pNode->pTNodeHeader == NULL);
taosCacheMoveToTrash(pCacheObj, pNode);
// todo trashcan lock
taosAddToTrash(pCacheObj, pNode);
} else {
taosCacheReleaseNode(pCacheObj, pNode);
// taosCacheReleaseNode(pCacheObj, pNode);
atomic_fetch_sub_ptr(&pCacheObj->totalSize, pNode->size);
uDebug("cache:%s, key:%p, %p is destroyed from cache, totalNum:%d totalSize:%" PRId64 "bytes size:%dbytes",
pCacheObj->name, pNode->key, pNode->data, (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize,
pNode->size);
if (pCacheObj->freeFp) pCacheObj->freeFp(pNode->data);
free(pNode);
}
}
__cache_unlock(pCacheObj);
} else {
// NOTE: once refcount is decrease, pNode may be freed by other thread immediately.
int32_t ref = T_REF_DEC(pNode);
......@@ -497,8 +500,7 @@ void taosCacheCleanup(SCacheObj *pCacheObj) {
doCleanupDataCache(pCacheObj);
}
SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size,
uint64_t duration) {
SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size, uint64_t duration) {
size_t totalSize = size + sizeof(SCacheDataNode) + keyLen;
SCacheDataNode *pNewNode = calloc(1, totalSize);
......
......@@ -207,7 +207,6 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
vDebug("vgId:%d, QInfo:%p, start to build result rsp after query paused, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle);
code = vnodeDumpQueryResult(&pReadMsg->rspRet, pVnode, *handle, &freehandle);
freehandle = false;
// todo test the error code case
if (code == TSDB_CODE_SUCCESS) {
code = TSDB_CODE_QRY_HAS_RSP;
......@@ -267,7 +266,6 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
}
code = vnodeDumpQueryResult(pRet, pVnode, *handle, &freeHandle);
freeHandle = false;
}
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, freeHandle);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册