diff --git a/include/util/tcache.h b/include/util/tcache.h index 7c29ab4f58d3828a699e982bb49d8a16ede0f1c7..b5c1578380f6481b1d4a6e19b6b014c37c8eca35 100644 --- a/include/util/tcache.h +++ b/include/util/tcache.h @@ -40,9 +40,9 @@ typedef struct SCacheStatis { int64_t refreshCount; } SCacheStatis; -typedef struct SCacheObj SCacheObj; - -struct STrashElem; +typedef struct SCacheObj SCacheObj; +typedef struct SCacheIter SCacheIter; +typedef struct STrashElem STrashElem; /** * initialize the cache object @@ -106,6 +106,13 @@ void *taosCacheTransferData(SCacheObj *pCacheObj, void **data); */ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove); +/** + * + * @param pCacheObj + * @return + */ +size_t taosCacheGetNumOfObj(const SCacheObj* pCacheObj); + /** * move all data node into trash, clear node in trash can if it is not referenced by any clients * @param handle @@ -138,6 +145,12 @@ void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void *param1); */ void taosStopCacheRefreshWorker(); +SCacheIter* taosCacheCreateIter(const SCacheObj* pCacheObj); +bool taosCacheIterNext(SCacheIter* pIter); +void* taosCacheIterGetData(const SCacheIter* pIter, size_t* dataLen); +void* taosCacheIterGetKey(const SCacheIter* pIter, size_t* keyLen); +void taosCacheDestroyIter(SCacheIter* pIter); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index e313c4d676e6e0770697ea879a0cf05271700538..54c2b2fbcdec58845c0a25432951ee87e464a154 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -48,7 +48,7 @@ static SConnObj *mndCreateConn(SMnode *pMnode, SRpcConnInfo *pInfo, int32_t pid, static void mndFreeConn(SConnObj *pConn); static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId); static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn); -static void *mndGetNextConn(SMnode *pMnode, void *pIter, SConnObj **pConn); +static void *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter); static void mndCancelGetNextConn(SMnode *pMnode, void *pIter); static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq); static int32_t mndProcessConnectReq(SMnodeMsg *pReq); @@ -158,27 +158,23 @@ static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn) { taosCacheRelease(pMgmt->cache, (void **)&pConn, false); } -static void *mndGetNextConn(SMnode *pMnode, void *pIter, SConnObj **pConn) { - SProfileMgmt *pMgmt = &pMnode->profileMgmt; - - *pConn = NULL; - - pIter = taosHashIterate(pMgmt->cache->pHashTable, pIter); - if (pIter == NULL) return NULL; - - SCacheDataNode **pNode = pIter; - if (pNode == NULL || *pNode == NULL) { - taosHashCancelIterate(pMgmt->cache->pHashTable, pIter); - return NULL; +void *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter) { + SConnObj* pConn = NULL; + bool hasNext = taosCacheIterNext(pIter); + if (hasNext) { + size_t dataLen = 0; + pConn = taosCacheIterGetData(pIter, &dataLen); + } else { + taosCacheDestroyIter(pIter); } - *pConn = (SConnObj *)((*pNode)->data); - return pIter; + return pConn; } static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) { - SProfileMgmt *pMgmt = &pMnode->profileMgmt; - taosHashCancelIterate(pMgmt->cache->pHashTable, pIter); + if (pIter != NULL) { + taosCacheDestroyIter(pIter); + } } static int32_t mndProcessConnectReq(SMnodeMsg *pReq) { @@ -376,8 +372,8 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { int32_t rspLen = 0; mndValidateDbInfo(pMnode, kv->value, kv->valueLen / sizeof(SDbVgVersion), &rspMsg, &rspLen); if (rspMsg && rspLen > 0) { - SKv kv = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg}; - taosArrayPush(hbRsp.info, &kv); + SKv kv1 = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg}; + taosArrayPush(hbRsp.info, &kv1); } break; } @@ -386,8 +382,8 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { int32_t rspLen = 0; mndValidateStbInfo(pMnode, kv->value, kv->valueLen / sizeof(SSTableMetaVersion), &rspMsg, &rspLen); if (rspMsg && rspLen > 0) { - SKv kv = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = rspLen, .value = rspMsg}; - taosArrayPush(hbRsp.info, &kv); + SKv kv1 = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = rspLen, .value = rspMsg}; + taosArrayPush(hbRsp.info, &kv1); } break; } @@ -638,7 +634,7 @@ static int32_t mndGetConnsMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp * pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; } - pShow->numOfRows = taosHashGetSize(pMgmt->cache->pHashTable); + pShow->numOfRows = taosCacheGetNumOfObj(pMgmt->cache); pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; strcpy(pMeta->tbName, mndShowStr(pShow->type)); @@ -653,8 +649,13 @@ static int32_t mndRetrieveConns(SMnodeMsg *pReq, SShowObj *pShow, char *data, in char *pWrite; char ipStr[TSDB_IPv4ADDR_LEN + 6]; + if (pShow->pIter == NULL) { + SProfileMgmt *pMgmt = &pMnode->profileMgmt; + pShow->pIter = taosCacheCreateIter(pMgmt->cache); + } + while (numOfRows < rows) { - pShow->pIter = mndGetNextConn(pMnode, pShow->pIter, &pConn); + pConn = mndGetNextConn(pMnode, pShow->pIter); if (pConn == NULL) break; cols = 0; @@ -823,19 +824,24 @@ static int32_t mndRetrieveQueries(SMnodeMsg *pReq, SShowObj *pShow, char *data, void *pIter; char str[TSDB_IPv4ADDR_LEN + 6] = {0}; + if (pShow->pIter == NULL) { + SProfileMgmt *pMgmt = &pMnode->profileMgmt; + pShow->pIter = taosCacheCreateIter(pMgmt->cache); + } + while (numOfRows < rows) { - pIter = mndGetNextConn(pMnode, pShow->pIter, &pConn); + pConn = mndGetNextConn(pMnode, pShow->pIter); if (pConn == NULL) { - pShow->pIter = pIter; + pShow->pIter = NULL; break; } if (numOfRows + pConn->numOfQueries >= rows) { - mndCancelGetNextConn(pMnode, pIter); + taosCacheDestroyIter(pShow->pIter); + pShow->pIter = NULL; break; } - pShow->pIter = pIter; for (int32_t i = 0; i < pConn->numOfQueries; ++i) { SQueryDesc *pDesc = pConn->pQueries + i; cols = 0; @@ -913,6 +919,7 @@ static int32_t mndRetrieveQueries(SMnodeMsg *pReq, SShowObj *pShow, char *data, } static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter) { - SProfileMgmt *pMgmt = &pMnode->profileMgmt; - taosHashCancelIterate(pMgmt->cache->pHashTable, pIter); + if (pIter != NULL) { + taosCacheDestroyIter(pIter); + } } diff --git a/source/util/src/tcache.c b/source/util/src/tcache.c index aec2530f5d709e4a66644c05d8bfed65ff32f968..39205582760d965bf67b8cdaf6d7d1fcb35bdfde 100644 --- a/source/util/src/tcache.c +++ b/source/util/src/tcache.c @@ -19,6 +19,9 @@ #include "tlog.h" #include "tutil.h" +#define CACHE_MAX_CAPACITY 1024*1024*16 +#define CACHE_DEFAULT_CAPACITY 1024*4 + static pthread_t cacheRefreshWorker = {0}; static pthread_once_t cacheThreadInit = PTHREAD_ONCE_INIT; static pthread_mutex_t guard = PTHREAD_MUTEX_INITIALIZER; @@ -55,6 +58,14 @@ typedef struct STrashElem { SCacheNode *pData; } STrashElem; +typedef struct SCacheIter { + SCacheObj *pCacheObj; + SCacheNode **pCurrent; + int32_t entryIndex; + int32_t index; + int32_t numOfObj; +} SCacheIter; + /* * to accommodate the old data which has the same key value of new one in hashList * when an new node is put into cache, if an existed one with the same key: @@ -264,6 +275,7 @@ static void removeNodeInEntryList(SCacheEntry* pe, SCacheNode* prev, SCacheNode* prev->pNext = pNode->pNext; } + pNode->pNext = NULL; pe->num -= 1; } @@ -287,6 +299,57 @@ doSearchInEntryList(SCacheEntry *pe, const void *key, size_t keyLen, SCacheNode* return pNode; } +static bool doRemoveExpiredFn(void *param, SCacheNode* pNode) { + SCacheObjTravSup *ps = (SCacheObjTravSup *)param; + SCacheObj *pCacheObj = ps->pCacheObj; + + if ((int64_t)pNode->expireTime < ps->time && T_REF_VAL_GET(pNode) <= 0) { + taosCacheReleaseNode(pCacheObj, pNode); + + // this node should be remove from hash table + return false; + } + + if (ps->fp) { + (ps->fp)(pNode->data, ps->param1); + } + + // do not remove element in hash table + return true; +} + +static bool doRemoveNodeFn(void *param, SCacheNode *pNode) { + SCacheObjTravSup *ps = (SCacheObjTravSup *)param; + SCacheObj *pCacheObj = ps->pCacheObj; + + if (T_REF_VAL_GET(pNode) == 0) { + taosCacheReleaseNode(pCacheObj, pNode); + } else { // do add to trashcan + taosAddToTrashcan(pCacheObj, pNode); + } + + // this node should be remove from hash table + return false; +} + +static FORCE_INLINE int32_t getCacheCapacity(int32_t length) { + int32_t len = 0; + if (length < CACHE_DEFAULT_CAPACITY) { + len = CACHE_DEFAULT_CAPACITY; + return len; + } else if (length > CACHE_MAX_CAPACITY) { + len = CACHE_MAX_CAPACITY; + return len; + } + + len = CACHE_DEFAULT_CAPACITY; + while (len < length && len < CACHE_MAX_CAPACITY) { + len = (len << 1u); + } + + return len > CACHE_MAX_CAPACITY? CACHE_MAX_CAPACITY:len; +} + SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_free_fn_t fn, const char *cacheName) { const int32_t SLEEP_DURATION = 500; // 500 ms @@ -301,7 +364,9 @@ SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool ext return NULL; } - pCacheObj->pEntryList = calloc(4096, sizeof(SCacheEntry)); + // TODO add the auto extend procedure + pCacheObj->capacity = 4096; + pCacheObj->pEntryList = calloc(pCacheObj->capacity, sizeof(SCacheEntry)); if (pCacheObj->pEntryList == NULL) { free(pCacheObj); uError("failed to allocate memory, reason:%s", strerror(errno)); @@ -309,7 +374,6 @@ SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool ext } // set free cache node callback function - pCacheObj->capacity = 4096; // todo refactor pCacheObj->hashFp = taosGetDefaultHashFunction(keyType); pCacheObj->freeFp = fn; pCacheObj->refreshTime = refreshTimeInSeconds * 1000; @@ -582,20 +646,6 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { } } -static bool doRemoveNodeFn(void *param, SCacheNode *pNode) { - SCacheObjTravSup *ps = (SCacheObjTravSup *)param; - SCacheObj *pCacheObj = ps->pCacheObj; - - if (T_REF_VAL_GET(pNode) == 0) { - taosCacheReleaseNode(pCacheObj, pNode); - } else { // do add to trashcan - taosAddToTrashcan(pCacheObj, pNode); - } - - // this node should be remove from hash table - return false; -} - void doTraverseElems(SCacheObj* pCacheObj, bool (*fp)(void *param, SCacheNode* pNode), SCacheObjTravSup* pSup) { int32_t numOfEntries = (int32_t)pCacheObj->capacity; for (int32_t i = 0; i < numOfEntries; ++i) { @@ -757,25 +807,6 @@ void doCleanupDataCache(SCacheObj *pCacheObj) { free(pCacheObj); } -bool doRemoveExpiredFn(void *param, SCacheNode* pNode) { - SCacheObjTravSup *ps = (SCacheObjTravSup *)param; - SCacheObj *pCacheObj = ps->pCacheObj; - - if ((int64_t)pNode->expireTime < ps->time && T_REF_VAL_GET(pNode) <= 0) { - taosCacheReleaseNode(pCacheObj, pNode); - - // this node should be remove from hash table - return false; - } - - if (ps->fp) { - (ps->fp)(pNode->data, ps->param1); - } - - // do not remove element in hash table - return true; -} - static void doCacheRefresh(SCacheObj *pCacheObj, int64_t time, __cache_trav_fn_t fp, void *param1) { assert(pCacheObj != NULL); @@ -877,4 +908,94 @@ void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void *param1) doCacheRefresh(pCacheObj, now, fp, param1); } -void taosStopCacheRefreshWorker(void) { stopRefreshWorker = true; } \ No newline at end of file +void taosStopCacheRefreshWorker(void) { + stopRefreshWorker = true; +} + +size_t taosCacheGetNumOfObj(const SCacheObj* pCacheObj) { + return pCacheObj->numOfElems + pCacheObj->numOfElemsInTrash; +} + +SCacheIter* taosCacheCreateIter(const SCacheObj* pCacheObj) { + ASSERT(pCacheObj != NULL); + SCacheIter* pIter = calloc(1, sizeof(SCacheIter)); + pIter->pCacheObj = (SCacheObj*) pCacheObj; + pIter->entryIndex = -1; + pIter->index = -1; + return pIter; +} + +bool taosCacheIterNext(SCacheIter* pIter) { + SCacheObj* pCacheObj = pIter->pCacheObj; + + if (pIter->index + 1 >= pIter->numOfObj) { + if (pIter->entryIndex + 1 >= pCacheObj->capacity) { + return false; + } + + // release the reference for all objects in the snapshot + for(int32_t i = 0; i < pIter->numOfObj; ++i) { + char* p= pIter->pCurrent[i]->data; + taosCacheRelease(pCacheObj, (void**) &p, false); + pIter->pCurrent[i] = NULL; + } + + while(1) { + SCacheEntry *pEntry = &pCacheObj->pEntryList[++pIter->entryIndex]; + taosRLockLatch(&pEntry->latch); + + if (pEntry->num == 0) { + taosRUnLockLatch(&pEntry->latch); + continue; + } + + if (pIter->numOfObj < pEntry->num) { + char *tmp = realloc(pIter->pCurrent, pEntry->num * POINTER_BYTES); + if (tmp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + taosRUnLockLatch(&pEntry->latch); + return false; + } + + pIter->pCurrent = (SCacheNode **)tmp; + } + + SCacheNode* pNode = pEntry->next; + for (int32_t i = 0; i < pEntry->num; ++i) { + ASSERT(pNode != NULL); + + pIter->pCurrent[i] = pNode; + int32_t ref = T_REF_INC(pIter->pCurrent[i]); + ASSERT(ref >= 1); + + pNode = pNode->pNext; + } + + pIter->numOfObj = pEntry->num; + taosRUnLockLatch(&pEntry->latch); + + pIter->index = -1; + break; + } + } + + pIter->index += 1; + return true; +} + +void* taosCacheIterGetData(const SCacheIter* pIter, size_t* len) { + SCacheNode* pNode = pIter->pCurrent[pIter->index]; + *len = pNode->dataLen; + return pNode->data; +} + +void* taosCacheIterGetKey(const SCacheIter* pIter, size_t* len) { + SCacheNode* pNode = pIter->pCurrent[pIter->index]; + *len = pNode->keyLen; + return pNode->key; +} + +void taosCacheDestroyIter(SCacheIter* pIter) { + tfree(pIter->pCurrent); + tfree(pIter); +} \ No newline at end of file diff --git a/source/util/test/cacheTest.cpp b/source/util/test/cacheTest.cpp index 970f1c23a9971af1e8dc8c6c89ad7805fce1457c..2fca340599a0493d0930f5c216b3bd8a0789ae8b 100644 --- a/source/util/test/cacheTest.cpp +++ b/source/util/test/cacheTest.cpp @@ -6,7 +6,7 @@ #include "tcache.h" // test cache -TEST(testCase, client_cache_test) { +TEST(cacheTest, client_cache_test) { const int32_t REFRESH_TIME_IN_SEC = 2; SCacheObj* tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, REFRESH_TIME_IN_SEC, 0, NULL, "test"); @@ -92,7 +92,7 @@ TEST(testCase, client_cache_test) { taosCacheCleanup(tscMetaCache); } -TEST(testCase, cache_resize_test) { +TEST(cacheTest, cache_iter_test) { const int32_t REFRESH_TIME_IN_SEC = 2; auto* pCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, REFRESH_TIME_IN_SEC, false, NULL, "test"); @@ -107,6 +107,7 @@ TEST(testCase, cache_resize_test) { int32_t len = sprintf(key, "abc_%7d", i); taosCachePut(pCache, key, strlen(key), data, len, 3600); } + uint64_t endTime = taosGetTimestampUs(); printf("add %d object cost:%" PRIu64 " us, avg:%f us\n", num, endTime - startTime, (endTime-startTime)/(double)num); @@ -120,5 +121,22 @@ TEST(testCase, cache_resize_test) { endTime = taosGetTimestampUs(); printf("retrieve %d object cost:%" PRIu64 " us,avg:%f\n", num, endTime - startTime, (endTime - startTime)/(double)num); + int32_t count = 0; + SCacheIter* pIter = taosCacheCreateIter(pCache); + while(taosCacheIterNext(pIter)) { + size_t keyLen = 0; + size_t dataLen = 0; + + char* key1 = static_cast(taosCacheIterGetKey(pIter, &keyLen)); + char* data1 = static_cast(taosCacheIterGetData(pIter, &dataLen)); + +// char d[256] = {0}; +// memcpy(d, data1, dataLen); +// char k[256] = {0}; +// memcpy(k, key1, keyLen); + } + + ASSERT_EQ(count, num); + taosCacheCleanup(pCache); -} \ No newline at end of file +}