提交 0ca24f3e 编写于 作者: H Haojun Liao

[td-13039] refactor cache.

上级 9b5b74b8
......@@ -40,9 +40,9 @@ typedef struct SCacheStatis {
int64_t refreshCount;
} SCacheStatis;
typedef struct SCacheObj SCacheObj;
struct STrashElem;
typedef struct SCacheObj SCacheObj;
typedef struct SCacheIter SCacheIter;
typedef struct STrashElem STrashElem;
/**
* initialize the cache object
......@@ -106,6 +106,13 @@ void *taosCacheTransferData(SCacheObj *pCacheObj, void **data);
*/
void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove);
/**
*
* @param pCacheObj
* @return
*/
size_t taosCacheGetNumOfObj(const SCacheObj* pCacheObj);
/**
* move all data node into trash, clear node in trash can if it is not referenced by any clients
* @param handle
......@@ -138,6 +145,12 @@ void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void *param1);
*/
void taosStopCacheRefreshWorker();
SCacheIter* taosCacheCreateIter(const SCacheObj* pCacheObj);
bool taosCacheIterNext(SCacheIter* pIter);
void* taosCacheIterGetData(const SCacheIter* pIter, size_t* dataLen);
void* taosCacheIterGetKey(const SCacheIter* pIter, size_t* keyLen);
void taosCacheDestroyIter(SCacheIter* pIter);
#ifdef __cplusplus
}
#endif
......
......@@ -48,7 +48,7 @@ static SConnObj *mndCreateConn(SMnode *pMnode, SRpcConnInfo *pInfo, int32_t pid,
static void mndFreeConn(SConnObj *pConn);
static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId);
static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn);
static void *mndGetNextConn(SMnode *pMnode, void *pIter, SConnObj **pConn);
static void *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter);
static void mndCancelGetNextConn(SMnode *pMnode, void *pIter);
static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq);
static int32_t mndProcessConnectReq(SMnodeMsg *pReq);
......@@ -158,27 +158,23 @@ static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn) {
taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
}
static void *mndGetNextConn(SMnode *pMnode, void *pIter, SConnObj **pConn) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
*pConn = NULL;
pIter = taosHashIterate(pMgmt->cache->pHashTable, pIter);
if (pIter == NULL) return NULL;
SCacheDataNode **pNode = pIter;
if (pNode == NULL || *pNode == NULL) {
taosHashCancelIterate(pMgmt->cache->pHashTable, pIter);
return NULL;
void *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter) {
SConnObj* pConn = NULL;
bool hasNext = taosCacheIterNext(pIter);
if (hasNext) {
size_t dataLen = 0;
pConn = taosCacheIterGetData(pIter, &dataLen);
} else {
taosCacheDestroyIter(pIter);
}
*pConn = (SConnObj *)((*pNode)->data);
return pIter;
return pConn;
}
static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
taosHashCancelIterate(pMgmt->cache->pHashTable, pIter);
if (pIter != NULL) {
taosCacheDestroyIter(pIter);
}
}
static int32_t mndProcessConnectReq(SMnodeMsg *pReq) {
......@@ -376,8 +372,8 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
int32_t rspLen = 0;
mndValidateDbInfo(pMnode, kv->value, kv->valueLen / sizeof(SDbVgVersion), &rspMsg, &rspLen);
if (rspMsg && rspLen > 0) {
SKv kv = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg};
taosArrayPush(hbRsp.info, &kv);
SKv kv1 = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg};
taosArrayPush(hbRsp.info, &kv1);
}
break;
}
......@@ -386,8 +382,8 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
int32_t rspLen = 0;
mndValidateStbInfo(pMnode, kv->value, kv->valueLen / sizeof(SSTableMetaVersion), &rspMsg, &rspLen);
if (rspMsg && rspLen > 0) {
SKv kv = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = rspLen, .value = rspMsg};
taosArrayPush(hbRsp.info, &kv);
SKv kv1 = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = rspLen, .value = rspMsg};
taosArrayPush(hbRsp.info, &kv1);
}
break;
}
......@@ -638,7 +634,7 @@ static int32_t mndGetConnsMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
pShow->numOfRows = taosHashGetSize(pMgmt->cache->pHashTable);
pShow->numOfRows = taosCacheGetNumOfObj(pMgmt->cache);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbName, mndShowStr(pShow->type));
......@@ -653,8 +649,13 @@ static int32_t mndRetrieveConns(SMnodeMsg *pReq, SShowObj *pShow, char *data, in
char *pWrite;
char ipStr[TSDB_IPv4ADDR_LEN + 6];
if (pShow->pIter == NULL) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
pShow->pIter = taosCacheCreateIter(pMgmt->cache);
}
while (numOfRows < rows) {
pShow->pIter = mndGetNextConn(pMnode, pShow->pIter, &pConn);
pConn = mndGetNextConn(pMnode, pShow->pIter);
if (pConn == NULL) break;
cols = 0;
......@@ -823,19 +824,24 @@ static int32_t mndRetrieveQueries(SMnodeMsg *pReq, SShowObj *pShow, char *data,
void *pIter;
char str[TSDB_IPv4ADDR_LEN + 6] = {0};
if (pShow->pIter == NULL) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
pShow->pIter = taosCacheCreateIter(pMgmt->cache);
}
while (numOfRows < rows) {
pIter = mndGetNextConn(pMnode, pShow->pIter, &pConn);
pConn = mndGetNextConn(pMnode, pShow->pIter);
if (pConn == NULL) {
pShow->pIter = pIter;
pShow->pIter = NULL;
break;
}
if (numOfRows + pConn->numOfQueries >= rows) {
mndCancelGetNextConn(pMnode, pIter);
taosCacheDestroyIter(pShow->pIter);
pShow->pIter = NULL;
break;
}
pShow->pIter = pIter;
for (int32_t i = 0; i < pConn->numOfQueries; ++i) {
SQueryDesc *pDesc = pConn->pQueries + i;
cols = 0;
......@@ -913,6 +919,7 @@ static int32_t mndRetrieveQueries(SMnodeMsg *pReq, SShowObj *pShow, char *data,
}
static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
taosHashCancelIterate(pMgmt->cache->pHashTable, pIter);
if (pIter != NULL) {
taosCacheDestroyIter(pIter);
}
}
......@@ -19,6 +19,9 @@
#include "tlog.h"
#include "tutil.h"
#define CACHE_MAX_CAPACITY 1024*1024*16
#define CACHE_DEFAULT_CAPACITY 1024*4
static pthread_t cacheRefreshWorker = {0};
static pthread_once_t cacheThreadInit = PTHREAD_ONCE_INIT;
static pthread_mutex_t guard = PTHREAD_MUTEX_INITIALIZER;
......@@ -55,6 +58,14 @@ typedef struct STrashElem {
SCacheNode *pData;
} STrashElem;
typedef struct SCacheIter {
SCacheObj *pCacheObj;
SCacheNode **pCurrent;
int32_t entryIndex;
int32_t index;
int32_t numOfObj;
} SCacheIter;
/*
* to accommodate the old data which has the same key value of new one in hashList
* when an new node is put into cache, if an existed one with the same key:
......@@ -264,6 +275,7 @@ static void removeNodeInEntryList(SCacheEntry* pe, SCacheNode* prev, SCacheNode*
prev->pNext = pNode->pNext;
}
pNode->pNext = NULL;
pe->num -= 1;
}
......@@ -287,6 +299,57 @@ doSearchInEntryList(SCacheEntry *pe, const void *key, size_t keyLen, SCacheNode*
return pNode;
}
static bool doRemoveExpiredFn(void *param, SCacheNode* pNode) {
SCacheObjTravSup *ps = (SCacheObjTravSup *)param;
SCacheObj *pCacheObj = ps->pCacheObj;
if ((int64_t)pNode->expireTime < ps->time && T_REF_VAL_GET(pNode) <= 0) {
taosCacheReleaseNode(pCacheObj, pNode);
// this node should be remove from hash table
return false;
}
if (ps->fp) {
(ps->fp)(pNode->data, ps->param1);
}
// do not remove element in hash table
return true;
}
static bool doRemoveNodeFn(void *param, SCacheNode *pNode) {
SCacheObjTravSup *ps = (SCacheObjTravSup *)param;
SCacheObj *pCacheObj = ps->pCacheObj;
if (T_REF_VAL_GET(pNode) == 0) {
taosCacheReleaseNode(pCacheObj, pNode);
} else { // do add to trashcan
taosAddToTrashcan(pCacheObj, pNode);
}
// this node should be remove from hash table
return false;
}
static FORCE_INLINE int32_t getCacheCapacity(int32_t length) {
int32_t len = 0;
if (length < CACHE_DEFAULT_CAPACITY) {
len = CACHE_DEFAULT_CAPACITY;
return len;
} else if (length > CACHE_MAX_CAPACITY) {
len = CACHE_MAX_CAPACITY;
return len;
}
len = CACHE_DEFAULT_CAPACITY;
while (len < length && len < CACHE_MAX_CAPACITY) {
len = (len << 1u);
}
return len > CACHE_MAX_CAPACITY? CACHE_MAX_CAPACITY:len;
}
SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_free_fn_t fn,
const char *cacheName) {
const int32_t SLEEP_DURATION = 500; // 500 ms
......@@ -301,7 +364,9 @@ SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool ext
return NULL;
}
pCacheObj->pEntryList = calloc(4096, sizeof(SCacheEntry));
// TODO add the auto extend procedure
pCacheObj->capacity = 4096;
pCacheObj->pEntryList = calloc(pCacheObj->capacity, sizeof(SCacheEntry));
if (pCacheObj->pEntryList == NULL) {
free(pCacheObj);
uError("failed to allocate memory, reason:%s", strerror(errno));
......@@ -309,7 +374,6 @@ SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool ext
}
// set free cache node callback function
pCacheObj->capacity = 4096; // todo refactor
pCacheObj->hashFp = taosGetDefaultHashFunction(keyType);
pCacheObj->freeFp = fn;
pCacheObj->refreshTime = refreshTimeInSeconds * 1000;
......@@ -582,20 +646,6 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
}
}
static bool doRemoveNodeFn(void *param, SCacheNode *pNode) {
SCacheObjTravSup *ps = (SCacheObjTravSup *)param;
SCacheObj *pCacheObj = ps->pCacheObj;
if (T_REF_VAL_GET(pNode) == 0) {
taosCacheReleaseNode(pCacheObj, pNode);
} else { // do add to trashcan
taosAddToTrashcan(pCacheObj, pNode);
}
// this node should be remove from hash table
return false;
}
void doTraverseElems(SCacheObj* pCacheObj, bool (*fp)(void *param, SCacheNode* pNode), SCacheObjTravSup* pSup) {
int32_t numOfEntries = (int32_t)pCacheObj->capacity;
for (int32_t i = 0; i < numOfEntries; ++i) {
......@@ -757,25 +807,6 @@ void doCleanupDataCache(SCacheObj *pCacheObj) {
free(pCacheObj);
}
bool doRemoveExpiredFn(void *param, SCacheNode* pNode) {
SCacheObjTravSup *ps = (SCacheObjTravSup *)param;
SCacheObj *pCacheObj = ps->pCacheObj;
if ((int64_t)pNode->expireTime < ps->time && T_REF_VAL_GET(pNode) <= 0) {
taosCacheReleaseNode(pCacheObj, pNode);
// this node should be remove from hash table
return false;
}
if (ps->fp) {
(ps->fp)(pNode->data, ps->param1);
}
// do not remove element in hash table
return true;
}
static void doCacheRefresh(SCacheObj *pCacheObj, int64_t time, __cache_trav_fn_t fp, void *param1) {
assert(pCacheObj != NULL);
......@@ -877,4 +908,94 @@ void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void *param1)
doCacheRefresh(pCacheObj, now, fp, param1);
}
void taosStopCacheRefreshWorker(void) { stopRefreshWorker = true; }
\ No newline at end of file
void taosStopCacheRefreshWorker(void) {
stopRefreshWorker = true;
}
size_t taosCacheGetNumOfObj(const SCacheObj* pCacheObj) {
return pCacheObj->numOfElems + pCacheObj->numOfElemsInTrash;
}
SCacheIter* taosCacheCreateIter(const SCacheObj* pCacheObj) {
ASSERT(pCacheObj != NULL);
SCacheIter* pIter = calloc(1, sizeof(SCacheIter));
pIter->pCacheObj = (SCacheObj*) pCacheObj;
pIter->entryIndex = -1;
pIter->index = -1;
return pIter;
}
bool taosCacheIterNext(SCacheIter* pIter) {
SCacheObj* pCacheObj = pIter->pCacheObj;
if (pIter->index + 1 >= pIter->numOfObj) {
if (pIter->entryIndex + 1 >= pCacheObj->capacity) {
return false;
}
// release the reference for all objects in the snapshot
for(int32_t i = 0; i < pIter->numOfObj; ++i) {
char* p= pIter->pCurrent[i]->data;
taosCacheRelease(pCacheObj, (void**) &p, false);
pIter->pCurrent[i] = NULL;
}
while(1) {
SCacheEntry *pEntry = &pCacheObj->pEntryList[++pIter->entryIndex];
taosRLockLatch(&pEntry->latch);
if (pEntry->num == 0) {
taosRUnLockLatch(&pEntry->latch);
continue;
}
if (pIter->numOfObj < pEntry->num) {
char *tmp = realloc(pIter->pCurrent, pEntry->num * POINTER_BYTES);
if (tmp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosRUnLockLatch(&pEntry->latch);
return false;
}
pIter->pCurrent = (SCacheNode **)tmp;
}
SCacheNode* pNode = pEntry->next;
for (int32_t i = 0; i < pEntry->num; ++i) {
ASSERT(pNode != NULL);
pIter->pCurrent[i] = pNode;
int32_t ref = T_REF_INC(pIter->pCurrent[i]);
ASSERT(ref >= 1);
pNode = pNode->pNext;
}
pIter->numOfObj = pEntry->num;
taosRUnLockLatch(&pEntry->latch);
pIter->index = -1;
break;
}
}
pIter->index += 1;
return true;
}
void* taosCacheIterGetData(const SCacheIter* pIter, size_t* len) {
SCacheNode* pNode = pIter->pCurrent[pIter->index];
*len = pNode->dataLen;
return pNode->data;
}
void* taosCacheIterGetKey(const SCacheIter* pIter, size_t* len) {
SCacheNode* pNode = pIter->pCurrent[pIter->index];
*len = pNode->keyLen;
return pNode->key;
}
void taosCacheDestroyIter(SCacheIter* pIter) {
tfree(pIter->pCurrent);
tfree(pIter);
}
\ No newline at end of file
......@@ -6,7 +6,7 @@
#include "tcache.h"
// test cache
TEST(testCase, client_cache_test) {
TEST(cacheTest, client_cache_test) {
const int32_t REFRESH_TIME_IN_SEC = 2;
SCacheObj* tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, REFRESH_TIME_IN_SEC, 0, NULL, "test");
......@@ -92,7 +92,7 @@ TEST(testCase, client_cache_test) {
taosCacheCleanup(tscMetaCache);
}
TEST(testCase, cache_resize_test) {
TEST(cacheTest, cache_iter_test) {
const int32_t REFRESH_TIME_IN_SEC = 2;
auto* pCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, REFRESH_TIME_IN_SEC, false, NULL, "test");
......@@ -107,6 +107,7 @@ TEST(testCase, cache_resize_test) {
int32_t len = sprintf(key, "abc_%7d", i);
taosCachePut(pCache, key, strlen(key), data, len, 3600);
}
uint64_t endTime = taosGetTimestampUs();
printf("add %d object cost:%" PRIu64 " us, avg:%f us\n", num, endTime - startTime, (endTime-startTime)/(double)num);
......@@ -120,5 +121,22 @@ TEST(testCase, cache_resize_test) {
endTime = taosGetTimestampUs();
printf("retrieve %d object cost:%" PRIu64 " us,avg:%f\n", num, endTime - startTime, (endTime - startTime)/(double)num);
int32_t count = 0;
SCacheIter* pIter = taosCacheCreateIter(pCache);
while(taosCacheIterNext(pIter)) {
size_t keyLen = 0;
size_t dataLen = 0;
char* key1 = static_cast<char*>(taosCacheIterGetKey(pIter, &keyLen));
char* data1 = static_cast<char*>(taosCacheIterGetData(pIter, &dataLen));
// char d[256] = {0};
// memcpy(d, data1, dataLen);
// char k[256] = {0};
// memcpy(k, key1, keyLen);
}
ASSERT_EQ(count, num);
taosCacheCleanup(pCache);
}
\ No newline at end of file
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册