diff --git a/source/libs/sync/inc/syncRaftEntry.h b/source/libs/sync/inc/syncRaftEntry.h index 82d5c0a6ead08ed673809335a90ae2d2406062ba..fdfabf12a3820d332b2404592a2f104a90fb76b6 100644 --- a/source/libs/sync/inc/syncRaftEntry.h +++ b/source/libs/sync/inc/syncRaftEntry.h @@ -26,6 +26,7 @@ extern "C" { #include "syncInt.h" #include "syncMessage.h" #include "taosdef.h" +#include "tskiplist.h" typedef struct SSyncRaftEntry { uint32_t bytes; @@ -58,29 +59,52 @@ void syncEntryLog(const SSyncRaftEntry* pObj); void syncEntryLog2(char* s, const SSyncRaftEntry* pObj); //----------------------------------- -typedef struct SRaftEntryCache { +typedef struct SRaftEntryHashCache { SHashObj* pEntryHash; int32_t maxCount; int32_t currentCount; TdThreadMutex mutex; SSyncNode* pSyncNode; +} SRaftEntryHashCache; + +SRaftEntryHashCache* raftCacheCreate(SSyncNode* pSyncNode, int32_t maxCount); +void raftCacheDestroy(SRaftEntryHashCache* pCache); +int32_t raftCachePutEntry(struct SRaftEntryHashCache* pCache, SSyncRaftEntry* pEntry); +int32_t raftCacheGetEntry(struct SRaftEntryHashCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry); +int32_t raftCacheGetEntryP(struct SRaftEntryHashCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry); +int32_t raftCacheDelEntry(struct SRaftEntryHashCache* pCache, SyncIndex index); +int32_t raftCacheGetAndDel(struct SRaftEntryHashCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry); +int32_t raftCacheClear(struct SRaftEntryHashCache* pCache); + +cJSON* raftCache2Json(SRaftEntryHashCache* pObj); +char* raftCache2Str(SRaftEntryHashCache* pObj); +void raftCachePrint(SRaftEntryHashCache* pObj); +void raftCachePrint2(char* s, SRaftEntryHashCache* pObj); +void raftCacheLog(SRaftEntryHashCache* pObj); +void raftCacheLog2(char* s, SRaftEntryHashCache* pObj); + +//----------------------------------- +typedef struct SRaftEntryCache { + SSkipList* pSkipList; + int32_t maxCount; + int32_t currentCount; + TdThreadMutex mutex; + SSyncNode* pSyncNode; } SRaftEntryCache; -SRaftEntryCache* raftCacheCreate(SSyncNode* pSyncNode, int32_t maxCount); -void raftCacheDestroy(SRaftEntryCache* pCache); -int32_t raftCachePutEntry(struct SRaftEntryCache* pCache, SSyncRaftEntry* pEntry); -int32_t raftCacheGetEntry(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry); -int32_t raftCacheGetEntryP(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry); -int32_t raftCacheDelEntry(struct SRaftEntryCache* pCache, SyncIndex index); -int32_t raftCacheGetAndDel(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry); -int32_t raftCacheClear(struct SRaftEntryCache* pCache); +SRaftEntryCache* raftEntryCacheCreate(SSyncNode* pSyncNode, int32_t maxCount); +void raftEntryCacheDestroy(SRaftEntryCache* pCache); +int32_t raftEntryCachePutEntry(struct SRaftEntryCache* pCache, SSyncRaftEntry* pEntry); +int32_t raftEntryCacheGetEntry(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry); +int32_t raftEntryCacheGetEntryP(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry); +int32_t raftEntryCacheClear(struct SRaftEntryCache* pCache, int32_t count); -cJSON* raftCache2Json(SRaftEntryCache* pObj); -char* raftCache2Str(SRaftEntryCache* pObj); -void raftCachePrint(SRaftEntryCache* pObj); -void raftCachePrint2(char* s, SRaftEntryCache* pObj); -void raftCacheLog(SRaftEntryCache* pObj); -void raftCacheLog2(char* s, SRaftEntryCache* pObj); +cJSON* raftEntryCache2Json(SRaftEntryCache* pObj); +char* raftEntryCache2Str(SRaftEntryCache* pObj); +void raftEntryCachePrint(SRaftEntryCache* pObj); +void raftEntryCachePrint2(char* s, SRaftEntryCache* pObj); +void raftEntryCacheLog(SRaftEntryCache* pObj); +void raftEntryCacheLog2(char* s, SRaftEntryCache* pObj); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncRaftEntry.c b/source/libs/sync/src/syncRaftEntry.c index 465584a40f0e867b52c0bb62aeb0d13cae551f67..89e67fab288b5a2a0594829b6a39b27bdf698c44 100644 --- a/source/libs/sync/src/syncRaftEntry.c +++ b/source/libs/sync/src/syncRaftEntry.c @@ -198,8 +198,8 @@ void syncEntryLog2(char* s, const SSyncRaftEntry* pObj) { } //----------------------------------- -SRaftEntryCache* raftCacheCreate(SSyncNode* pSyncNode, int32_t maxCount) { - SRaftEntryCache* pCache = taosMemoryMalloc(sizeof(SRaftEntryCache)); +SRaftEntryHashCache* raftCacheCreate(SSyncNode* pSyncNode, int32_t maxCount) { + SRaftEntryHashCache* pCache = taosMemoryMalloc(sizeof(SRaftEntryHashCache)); if (pCache == NULL) { sError("vgId:%d raft cache create error", pSyncNode->vgId); return NULL; @@ -220,7 +220,7 @@ SRaftEntryCache* raftCacheCreate(SSyncNode* pSyncNode, int32_t maxCount) { return pCache; } -void raftCacheDestroy(SRaftEntryCache* pCache) { +void raftCacheDestroy(SRaftEntryHashCache* pCache) { if (pCache != NULL) { taosThreadMutexLock(&(pCache->mutex)); taosHashCleanup(pCache->pEntryHash); @@ -233,7 +233,7 @@ void raftCacheDestroy(SRaftEntryCache* pCache) { // success, return 1 // max count, return 0 // error, return -1 -int32_t raftCachePutEntry(struct SRaftEntryCache* pCache, SSyncRaftEntry* pEntry) { +int32_t raftCachePutEntry(struct SRaftEntryHashCache* pCache, SSyncRaftEntry* pEntry) { taosThreadMutexLock(&(pCache->mutex)); if (pCache->currentCount >= pCache->maxCount) { @@ -259,7 +259,7 @@ int32_t raftCachePutEntry(struct SRaftEntryCache* pCache, SSyncRaftEntry* pEntry // success, return 0 // error, return -1 // not exist, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST -int32_t raftCacheGetEntry(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) { +int32_t raftCacheGetEntry(struct SRaftEntryHashCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) { if (ppEntry == NULL) { return -1; } @@ -292,7 +292,7 @@ int32_t raftCacheGetEntry(struct SRaftEntryCache* pCache, SyncIndex index, SSync // success, return 0 // error, return -1 // not exist, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST -int32_t raftCacheGetEntryP(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) { +int32_t raftCacheGetEntryP(struct SRaftEntryHashCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) { if (ppEntry == NULL) { return -1; } @@ -321,7 +321,7 @@ int32_t raftCacheGetEntryP(struct SRaftEntryCache* pCache, SyncIndex index, SSyn return -1; } -int32_t raftCacheDelEntry(struct SRaftEntryCache* pCache, SyncIndex index) { +int32_t raftCacheDelEntry(struct SRaftEntryHashCache* pCache, SyncIndex index) { taosThreadMutexLock(&(pCache->mutex)); taosHashRemove(pCache->pEntryHash, &index, sizeof(index)); --(pCache->currentCount); @@ -329,7 +329,7 @@ int32_t raftCacheDelEntry(struct SRaftEntryCache* pCache, SyncIndex index) { return 0; } -int32_t raftCacheGetAndDel(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) { +int32_t raftCacheGetAndDel(struct SRaftEntryHashCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) { if (ppEntry == NULL) { return -1; } @@ -362,7 +362,7 @@ int32_t raftCacheGetAndDel(struct SRaftEntryCache* pCache, SyncIndex index, SSyn return -1; } -int32_t raftCacheClear(struct SRaftEntryCache* pCache) { +int32_t raftCacheClear(struct SRaftEntryHashCache* pCache) { taosThreadMutexLock(&(pCache->mutex)); taosHashClear(pCache->pEntryHash); pCache->currentCount = 0; @@ -371,7 +371,7 @@ int32_t raftCacheClear(struct SRaftEntryCache* pCache) { } //----------------------------------- -cJSON* raftCache2Json(SRaftEntryCache* pCache) { +cJSON* raftCache2Json(SRaftEntryHashCache* pCache) { char u64buf[128] = {0}; cJSON* pRoot = cJSON_CreateObject(); @@ -402,41 +402,283 @@ cJSON* raftCache2Json(SRaftEntryCache* pCache) { } cJSON* pJson = cJSON_CreateObject(); - cJSON_AddItemToObject(pJson, "SRaftEntryCache", pRoot); + cJSON_AddItemToObject(pJson, "SRaftEntryHashCache", pRoot); return pJson; } -char* raftCache2Str(SRaftEntryCache* pCache) { +char* raftCache2Str(SRaftEntryHashCache* pCache) { cJSON* pJson = raftCache2Json(pCache); char* serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } -void raftCachePrint(SRaftEntryCache* pCache) { +void raftCachePrint(SRaftEntryHashCache* pCache) { char* serialized = raftCache2Str(pCache); printf("raftCachePrint | len:%" PRIu64 " | %s \n", strlen(serialized), serialized); fflush(NULL); taosMemoryFree(serialized); } -void raftCachePrint2(char* s, SRaftEntryCache* pCache) { +void raftCachePrint2(char* s, SRaftEntryHashCache* pCache) { char* serialized = raftCache2Str(pCache); printf("raftCachePrint2 | len:%" PRIu64 " | %s | %s \n", strlen(serialized), s, serialized); fflush(NULL); taosMemoryFree(serialized); } -void raftCacheLog(SRaftEntryCache* pCache) { +void raftCacheLog(SRaftEntryHashCache* pCache) { char* serialized = raftCache2Str(pCache); sTrace("raftCacheLog | len:%" PRIu64 " | %s", strlen(serialized), serialized); taosMemoryFree(serialized); } -void raftCacheLog2(char* s, SRaftEntryCache* pCache) { +void raftCacheLog2(char* s, SRaftEntryHashCache* pCache) { if (gRaftDetailLog) { char* serialized = raftCache2Str(pCache); sTraceLong("raftCacheLog2 | len:%" PRIu64 " | %s | %s", strlen(serialized), s, serialized); taosMemoryFree(serialized); } +} + +//----------------------------------- +static char* keyFn(const void* pData) { + SSyncRaftEntry* pEntry = (SSyncRaftEntry*)pData; + return (char*)(&(pEntry->index)); +} + +static int cmpFn(const void* p1, const void* p2) { return memcmp(p1, p2, sizeof(SyncIndex)); } + +SRaftEntryCache* raftEntryCacheCreate(SSyncNode* pSyncNode, int32_t maxCount) { + SRaftEntryCache* pCache = taosMemoryMalloc(sizeof(SRaftEntryCache)); + if (pCache == NULL) { + sError("vgId:%d raft cache create error", pSyncNode->vgId); + return NULL; + } + + pCache->pSkipList = + tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_BINARY, sizeof(SyncIndex), cmpFn, SL_ALLOW_DUP_KEY, keyFn); + if (pCache->pSkipList == NULL) { + sError("vgId:%d raft cache create hash error", pSyncNode->vgId); + return NULL; + } + + taosThreadMutexInit(&(pCache->mutex), NULL); + pCache->maxCount = maxCount; + pCache->currentCount = 0; + pCache->pSyncNode = pSyncNode; + + return pCache; +} + +void raftEntryCacheDestroy(SRaftEntryCache* pCache) { + if (pCache != NULL) { + taosThreadMutexLock(&(pCache->mutex)); + tSkipListDestroy(pCache->pSkipList); + taosThreadMutexUnlock(&(pCache->mutex)); + taosThreadMutexDestroy(&(pCache->mutex)); + taosMemoryFree(pCache); + } +} + +// success, return 1 +// max count, return 0 +// error, return -1 +int32_t raftEntryCachePutEntry(struct SRaftEntryCache* pCache, SSyncRaftEntry* pEntry) { + taosThreadMutexLock(&(pCache->mutex)); + + if (pCache->currentCount >= pCache->maxCount) { + taosThreadMutexUnlock(&(pCache->mutex)); + return 0; + } + + SSkipListNode* pSkipListNode = tSkipListPut(pCache->pSkipList, pEntry); + ASSERT(pSkipListNode != NULL); + ++(pCache->currentCount); + + do { + char eventLog[128]; + snprintf(eventLog, sizeof(eventLog), "raft cache add, type:%s,%d, type2:%s,%d, index:%" PRId64 ", bytes:%d", + TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType, + pEntry->index, pEntry->bytes); + syncNodeEventLog(pCache->pSyncNode, eventLog); + } while (0); + + taosThreadMutexUnlock(&(pCache->mutex)); + return 1; +} + +// find one, return 1 +// not found, return 0 +// error, return -1 +int32_t raftEntryCacheGetEntry(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) { + ASSERT(ppEntry != NULL); + SSyncRaftEntry* pEntry = NULL; + int32_t code = raftEntryCacheGetEntryP(pCache, index, &pEntry); + if (code == 1) { + *ppEntry = taosMemoryMalloc(pEntry->bytes); + memcpy(*ppEntry, pEntry, pEntry->bytes); + } else { + *ppEntry = NULL; + } + return code; +} + +// find one, return 1 +// not found, return 0 +// error, return -1 +int32_t raftEntryCacheGetEntryP(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) { + taosThreadMutexLock(&(pCache->mutex)); + + SyncIndex index2 = index; + int32_t code = 0; + + SArray* entryPArray = tSkipListGet(pCache->pSkipList, (char*)(&index2)); + int32_t arraySize = taosArrayGetSize(entryPArray); + if (arraySize == 1) { + SSkipListNode** ppNode = (SSkipListNode**)taosArrayGet(entryPArray, 0); + ASSERT(*ppNode != NULL); + *ppEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(*ppNode); + code = 1; + + } else if (arraySize == 0) { + code = 0; + + } else { + ASSERT(0); + + code = -1; + } + taosArrayDestroy(entryPArray); + + taosThreadMutexUnlock(&(pCache->mutex)); + return code; +} + +// count = -1, clear all +// count >= 0, clear count +// return -1, error +// return delete count +int32_t raftEntryCacheClear(struct SRaftEntryCache* pCache, int32_t count) { + taosThreadMutexLock(&(pCache->mutex)); + int32_t returnCnt = 0; + + if (count == -1) { + // clear all + SSkipListIterator* pIter = tSkipListCreateIter(pCache->pSkipList); + while (tSkipListIterNext(pIter)) { + SSkipListNode* pNode = tSkipListIterGet(pIter); + ASSERT(pNode != NULL); + SSyncRaftEntry* pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(pNode); + syncEntryDestory(pEntry); + ++returnCnt; + } + tSkipListDestroyIter(pIter); + + tSkipListDestroy(pCache->pSkipList); + pCache->pSkipList = + tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_BINARY, sizeof(SyncIndex), cmpFn, SL_ALLOW_DUP_KEY, keyFn); + ASSERT(pCache->pSkipList != NULL); + + } else { + // clear count + int i = 0; + SSkipListIterator* pIter = tSkipListCreateIter(pCache->pSkipList); + SArray* delNodeArray = taosArrayInit(0, sizeof(SSkipListNode*)); + + // free entry + while (tSkipListIterNext(pIter)) { + SSkipListNode* pNode = tSkipListIterGet(pIter); + ASSERT(pNode != NULL); + if (i++ >= count) { + break; + } + + // sDebug("push pNode:%p", pNode); + taosArrayPush(delNodeArray, &pNode); + ++returnCnt; + SSyncRaftEntry* pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(pNode); + syncEntryDestory(pEntry); + } + tSkipListDestroyIter(pIter); + + // delete skiplist node + int32_t arraySize = taosArrayGetSize(delNodeArray); + for (int32_t i = 0; i < arraySize; ++i) { + SSkipListNode** ppNode = taosArrayGet(delNodeArray, i); + // sDebug("get pNode:%p", *ppNode); + tSkipListRemoveNode(pCache->pSkipList, *ppNode); + } + taosArrayDestroy(delNodeArray); + } + + pCache->currentCount -= returnCnt; + taosThreadMutexUnlock(&(pCache->mutex)); + return returnCnt; +} + +cJSON* raftEntryCache2Json(SRaftEntryCache* pCache) { + char u64buf[128] = {0}; + cJSON* pRoot = cJSON_CreateObject(); + + if (pCache != NULL) { + taosThreadMutexLock(&(pCache->mutex)); + + snprintf(u64buf, sizeof(u64buf), "%p", pCache->pSyncNode); + cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf); + cJSON_AddNumberToObject(pRoot, "currentCount", pCache->currentCount); + cJSON_AddNumberToObject(pRoot, "maxCount", pCache->maxCount); + cJSON* pEntries = cJSON_CreateArray(); + cJSON_AddItemToObject(pRoot, "entries", pEntries); + + SSkipListIterator* pIter = tSkipListCreateIter(pCache->pSkipList); + while (tSkipListIterNext(pIter)) { + SSkipListNode* pNode = tSkipListIterGet(pIter); + ASSERT(pNode != NULL); + SSyncRaftEntry* pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(pNode); + cJSON_AddItemToArray(pEntries, syncEntry2Json(pEntry)); + } + tSkipListDestroyIter(pIter); + + taosThreadMutexUnlock(&(pCache->mutex)); + } + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SRaftEntryCache", pRoot); + return pJson; +} + +char* raftEntryCache2Str(SRaftEntryCache* pObj) { + cJSON* pJson = raftEntryCache2Json(pObj); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + +void raftEntryCachePrint(SRaftEntryCache* pObj) { + char* serialized = raftEntryCache2Str(pObj); + printf("raftEntryCachePrint | len:%" PRIu64 " | %s \n", strlen(serialized), serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void raftEntryCachePrint2(char* s, SRaftEntryCache* pObj) { + char* serialized = raftEntryCache2Str(pObj); + printf("raftEntryCachePrint2 | len:%" PRIu64 " | %s | %s \n", strlen(serialized), s, serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void raftEntryCacheLog(SRaftEntryCache* pObj) { + char* serialized = raftEntryCache2Str(pObj); + sTrace("raftEntryCacheLog | len:%" PRIu64 " | %s", strlen(serialized), serialized); + taosMemoryFree(serialized); +} + +void raftEntryCacheLog2(char* s, SRaftEntryCache* pObj) { + if (gRaftDetailLog) { + char* serialized = raftEntryCache2Str(pObj); + sTraceLong("raftEntryCacheLog2 | len:%" PRIu64 " | %s | %s", strlen(serialized), s, serialized); + taosMemoryFree(serialized); + } } \ No newline at end of file diff --git a/source/libs/sync/src/syncRespMgr.c b/source/libs/sync/src/syncRespMgr.c index 8e477a5159b8f23951b9479a2d17e406c57805a9..97e5816038a4213a222b336cdb687560290be4be 100644 --- a/source/libs/sync/src/syncRespMgr.c +++ b/source/libs/sync/src/syncRespMgr.c @@ -127,7 +127,7 @@ void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl) { while (pStub) { size_t len; - void *key = taosHashGetKey(pStub, &len); + void * key = taosHashGetKey(pStub, &len); uint64_t *pSeqNum = (uint64_t *)key; int64_t nowMS = taosGetTimestampMs(); diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt index e1f3a2b2fc18d1d0481576ccbb3363cf95c5358a..e787080795a36f77aca41d520ce7e232e9555358 100644 --- a/source/libs/sync/test/CMakeLists.txt +++ b/source/libs/sync/test/CMakeLists.txt @@ -18,6 +18,7 @@ add_executable(syncIndexMgrTest "") add_executable(syncLogStoreTest "") add_executable(syncEntryTest "") add_executable(syncEntryCacheTest "") +add_executable(syncHashCacheTest "") add_executable(syncRequestVoteTest "") add_executable(syncRequestVoteReplyTest "") add_executable(syncAppendEntriesTest "") @@ -137,6 +138,10 @@ target_sources(syncEntryCacheTest PRIVATE "syncEntryCacheTest.cpp" ) +target_sources(syncHashCacheTest + PRIVATE + "syncHashCacheTest.cpp" +) target_sources(syncRequestVoteTest PRIVATE "syncRequestVoteTest.cpp" @@ -387,6 +392,11 @@ target_include_directories(syncEntryCacheTest "${TD_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(syncHashCacheTest + PUBLIC + "${TD_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_include_directories(syncRequestVoteTest PUBLIC "${TD_SOURCE_DIR}/include/libs/sync" @@ -654,6 +664,10 @@ target_link_libraries(syncEntryCacheTest sync gtest_main ) +target_link_libraries(syncHashCacheTest + sync + gtest_main +) target_link_libraries(syncRequestVoteTest sync gtest_main diff --git a/source/libs/sync/test/syncEntryCacheTest.cpp b/source/libs/sync/test/syncEntryCacheTest.cpp index 6250181b25d5bb88319532cb53fac5843aac361a..66b93563e752f16b1af04dcc0272c6d5a7731cf1 100644 --- a/source/libs/sync/test/syncEntryCacheTest.cpp +++ b/source/libs/sync/test/syncEntryCacheTest.cpp @@ -43,222 +43,82 @@ SRaftEntryCache* createCache(int maxCount) { SSyncNode* pSyncNode = createFakeNode(); ASSERT(pSyncNode != NULL); - SRaftEntryCache* pCache = raftCacheCreate(pSyncNode, maxCount); + SRaftEntryCache* pCache = raftEntryCacheCreate(pSyncNode, maxCount); ASSERT(pCache != NULL); return pCache; } void test1() { - int32_t code = 0; + int32_t code = 0; SRaftEntryCache* pCache = createCache(5); - for (int i = 0; i < 5; ++i) { + for (int i = 0; i < 10; ++i) { SSyncRaftEntry* pEntry = createEntry(i); - code = raftCachePutEntry(pCache, pEntry); - ASSERT(code == 1); - syncEntryDestory(pEntry); + code = raftEntryCachePutEntry(pCache, pEntry); + sTrace("put entry code:%d, pEntry:%p", code, pEntry); } - raftCacheLog2((char*)"==test1 write 5 entries==", pCache); + raftEntryCacheLog2((char*)"==test1 write 5 entries==", pCache); - SyncIndex index; - index = 1; - code = raftCacheDelEntry(pCache, index); - ASSERT(code == 0); - index = 3; - code = raftCacheDelEntry(pCache, index); - ASSERT(code == 0); - raftCacheLog2((char*)"==test1 delete 1,3==", pCache); + raftEntryCacheClear(pCache, 3); + raftEntryCacheLog2((char*)"==test1 evict 3 entries==", pCache); - code = raftCacheClear(pCache); - ASSERT(code == 0); - raftCacheLog2((char*)"==clear all==", pCache); + raftEntryCacheClear(pCache, -1); + raftEntryCacheLog2((char*)"==test1 evict -1(all) entries==", pCache); } void test2() { - int32_t code = 0; + int32_t code = 0; SRaftEntryCache* pCache = createCache(5); - for (int i = 0; i < 5; ++i) { + for (int i = 0; i < 10; ++i) { SSyncRaftEntry* pEntry = createEntry(i); - code = raftCachePutEntry(pCache, pEntry); - ASSERT(code == 1); - syncEntryDestory(pEntry); + code = raftEntryCachePutEntry(pCache, pEntry); + sTrace("put entry code:%d, pEntry:%p", code, pEntry); } - raftCacheLog2((char*)"==test2 write 5 entries==", pCache); + raftEntryCacheLog2((char*)"==test1 write 5 entries==", pCache); - SyncIndex index; - index = 1; - SSyncRaftEntry* pEntry; - code = raftCacheGetEntry(pCache, index, &pEntry); - ASSERT(code == 0); - syncEntryDestory(pEntry); - syncEntryLog2((char*)"==test2 get entry 1==", pEntry); + SyncIndex index = 2; + SSyncRaftEntry* pEntry = NULL; - index = 2; - code = raftCacheGetEntryP(pCache, index, &pEntry); - ASSERT(code == 0); + code = raftEntryCacheGetEntryP(pCache, index, &pEntry); + ASSERT(code == 1 && index == pEntry->index); + sTrace("get entry:%p for %ld", pEntry, index); syncEntryLog2((char*)"==test2 get entry pointer 2==", pEntry); + code = raftEntryCacheGetEntry(pCache, index, &pEntry); + ASSERT(code == 1 && index == pEntry->index); + sTrace("get entry:%p for %ld", pEntry, index); + syncEntryLog2((char*)"==test2 get entry 2==", pEntry); + syncEntryDestory(pEntry); + // not found index = 8; - code = raftCacheGetEntry(pCache, index, &pEntry); - ASSERT(code == -1 && terrno == TSDB_CODE_WAL_LOG_NOT_EXIST); + code = raftEntryCacheGetEntry(pCache, index, &pEntry); + ASSERT(code == 0); + sTrace("get entry:%p for %ld", pEntry, index); sTrace("==test2 get entry 8 not found=="); // not found index = 9; - code = raftCacheGetEntryP(pCache, index, &pEntry); - ASSERT(code == -1 && terrno == TSDB_CODE_WAL_LOG_NOT_EXIST); - sTrace("==test2 get entry pointer 9 not found=="); + code = raftEntryCacheGetEntry(pCache, index, &pEntry); + ASSERT(code == 0); + sTrace("get entry:%p for %ld", pEntry, index); + sTrace("==test2 get entry 9 not found=="); } void test3() { - int32_t code = 0; - SRaftEntryCache* pCache = createCache(5); - for (int i = 0; i < 5; ++i) { - SSyncRaftEntry* pEntry = createEntry(i); - code = raftCachePutEntry(pCache, pEntry); - ASSERT(code == 1); - syncEntryDestory(pEntry); - } - for (int i = 6; i < 10; ++i) { - SSyncRaftEntry* pEntry = createEntry(i); - code = raftCachePutEntry(pCache, pEntry); - ASSERT(code == 0); - syncEntryDestory(pEntry); - } - raftCacheLog2((char*)"==test3 write 10 entries, max count is 5==", pCache); -} - -void test4() { - int32_t code = 0; - SRaftEntryCache* pCache = createCache(5); - for (int i = 0; i < 5; ++i) { + int32_t code = 0; + SRaftEntryCache* pCache = createCache(20); + for (int i = 0; i <= 4; ++i) { SSyncRaftEntry* pEntry = createEntry(i); - code = raftCachePutEntry(pCache, pEntry); - ASSERT(code == 1); - syncEntryDestory(pEntry); - } - raftCacheLog2((char*)"==test4 write 5 entries==", pCache); - - SyncIndex index; - index = 3; - SSyncRaftEntry* pEntry; - code = raftCacheGetAndDel(pCache, index, &pEntry); - ASSERT(code == 0); - syncEntryLog2((char*)"==test4 get-and-del entry 3==", pEntry); - raftCacheLog2((char*)"==test4 after get-and-del entry 3==", pCache); -} - -static char* keyFn(const void* pData) { - SSyncRaftEntry* pEntry = (SSyncRaftEntry*)pData; - return (char*)(&(pEntry->index)); -} - -static int cmpFn(const void* p1, const void* p2) { return memcmp(p1, p2, sizeof(SyncIndex)); } - -void printSkipList(SSkipList* pSkipList) { - ASSERT(pSkipList != NULL); - - SSkipListIterator* pIter = tSkipListCreateIter(pSkipList); - while (tSkipListIterNext(pIter)) { - SSkipListNode* pNode = tSkipListIterGet(pIter); - ASSERT(pNode != NULL); - SSyncRaftEntry* pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(pNode); - syncEntryPrint2((char*)"", pEntry); - } -} - -void delSkipListFirst(SSkipList* pSkipList, int n) { - ASSERT(pSkipList != NULL); - - sTrace("delete first %d -------------", n); - SSkipListIterator* pIter = tSkipListCreateIter(pSkipList); - for (int i = 0; i < n; ++i) { - tSkipListIterNext(pIter); - SSkipListNode* pNode = tSkipListIterGet(pIter); - tSkipListRemoveNode(pSkipList, pNode); + code = raftEntryCachePutEntry(pCache, pEntry); + sTrace("put entry code:%d, pEntry:%p", code, pEntry); } -} - - -SSyncRaftEntry* getLogEntry2(SSkipList* pSkipList, SyncIndex index) { - SyncIndex index2 = index; - SSyncRaftEntry *pEntry = NULL; - int arraySize = 0; - - SArray* entryPArray = tSkipListGet(pSkipList, (char*)(&index2)); - arraySize = taosArrayGetSize(entryPArray); - if (arraySize > 0) { - SSkipListNode** ppNode = (SSkipListNode**)taosArrayGet(entryPArray, 0); - ASSERT(*ppNode != NULL); - pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(*ppNode); - } - taosArrayDestroy(entryPArray); - - sTrace("get index2: %ld, arraySize:%d -------------", index, arraySize); - syncEntryLog2((char*)"getLogEntry2", pEntry); - return pEntry; -} - - -SSyncRaftEntry* getLogEntry(SSkipList* pSkipList, SyncIndex index) { - sTrace("get index: %ld -------------", index); - SyncIndex index2 = index; - SSyncRaftEntry *pEntry = NULL; - SSkipListIterator* pIter = tSkipListCreateIterFromVal(pSkipList, (const char *)&index2, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC); - if (tSkipListIterNext(pIter)) { - SSkipListNode* pNode = tSkipListIterGet(pIter); - ASSERT(pNode != NULL); - pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(pNode); - } - - syncEntryLog2((char*)"getLogEntry", pEntry); - return pEntry; -} - -void test5() { - SSkipList* pSkipList = - tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_BINARY, sizeof(SyncIndex), cmpFn, SL_ALLOW_DUP_KEY, keyFn); - ASSERT(pSkipList != NULL); - - sTrace("insert 9 - 5"); for (int i = 9; i >= 5; --i) { SSyncRaftEntry* pEntry = createEntry(i); - SSkipListNode* pSkipListNode = tSkipListPut(pSkipList, pEntry); + code = raftEntryCachePutEntry(pCache, pEntry); + sTrace("put entry code:%d, pEntry:%p", code, pEntry); } - - sTrace("insert 0 - 4"); - for (int i = 0; i <= 4; ++i) { - SSyncRaftEntry* pEntry = createEntry(i); - SSkipListNode* pSkipListNode = tSkipListPut(pSkipList, pEntry); - } - - sTrace("insert 7 7 7 7 7"); - for (int i = 0; i <= 4; ++i) { - SSyncRaftEntry* pEntry = createEntry(7); - SSkipListNode* pSkipListNode = tSkipListPut(pSkipList, pEntry); - } - - sTrace("print: -------------"); - printSkipList(pSkipList); - - delSkipListFirst(pSkipList, 3); - - sTrace("print: -------------"); - printSkipList(pSkipList); - - getLogEntry(pSkipList, 2); - getLogEntry(pSkipList, 5); - getLogEntry(pSkipList, 7); - getLogEntry(pSkipList, 7); - - getLogEntry2(pSkipList, 2); - getLogEntry2(pSkipList, 5); - getLogEntry2(pSkipList, 7); - getLogEntry2(pSkipList, 7); - - - tSkipListDestroy(pSkipList); + raftEntryCacheLog2((char*)"==test3 write 10 entries==", pCache); } int main(int argc, char** argv) { @@ -266,14 +126,9 @@ int main(int argc, char** argv) { tsAsyncLog = 0; sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE + DEBUG_DEBUG; - /* - test1(); - test2(); - test3(); - test4(); - */ - - test5(); + test1(); + test2(); + test3(); return 0; } diff --git a/source/libs/sync/test/syncHashCacheTest.cpp b/source/libs/sync/test/syncHashCacheTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..f155bd834fb1ac860c72372b1a99ad7a3f0b1474 --- /dev/null +++ b/source/libs/sync/test/syncHashCacheTest.cpp @@ -0,0 +1,277 @@ +#include +#include "syncEnv.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncRaftLog.h" +#include "syncRaftStore.h" +#include "syncUtil.h" +#include "tskiplist.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +SSyncRaftEntry* createEntry(int i) { + int32_t dataLen = 20; + SSyncRaftEntry* pEntry = syncEntryBuild(dataLen); + assert(pEntry != NULL); + pEntry->msgType = 88; + pEntry->originalRpcType = 99; + pEntry->seqNum = 3; + pEntry->isWeak = true; + pEntry->term = 100 + i; + pEntry->index = i; + snprintf(pEntry->data, dataLen, "value%d", i); + + return pEntry; +} + +SSyncNode* createFakeNode() { + SSyncNode* pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode)); + ASSERT(pSyncNode != NULL); + memset(pSyncNode, 0, sizeof(SSyncNode)); + + return pSyncNode; +} + +SRaftEntryHashCache* createCache(int maxCount) { + SSyncNode* pSyncNode = createFakeNode(); + ASSERT(pSyncNode != NULL); + + SRaftEntryHashCache* pCache = raftCacheCreate(pSyncNode, maxCount); + ASSERT(pCache != NULL); + + return pCache; +} + +void test1() { + int32_t code = 0; + SRaftEntryHashCache* pCache = createCache(5); + for (int i = 0; i < 5; ++i) { + SSyncRaftEntry* pEntry = createEntry(i); + code = raftCachePutEntry(pCache, pEntry); + ASSERT(code == 1); + syncEntryDestory(pEntry); + } + raftCacheLog2((char*)"==test1 write 5 entries==", pCache); + + SyncIndex index; + index = 1; + code = raftCacheDelEntry(pCache, index); + ASSERT(code == 0); + index = 3; + code = raftCacheDelEntry(pCache, index); + ASSERT(code == 0); + raftCacheLog2((char*)"==test1 delete 1,3==", pCache); + + code = raftCacheClear(pCache); + ASSERT(code == 0); + raftCacheLog2((char*)"==clear all==", pCache); +} + +void test2() { + int32_t code = 0; + SRaftEntryHashCache* pCache = createCache(5); + for (int i = 0; i < 5; ++i) { + SSyncRaftEntry* pEntry = createEntry(i); + code = raftCachePutEntry(pCache, pEntry); + ASSERT(code == 1); + syncEntryDestory(pEntry); + } + raftCacheLog2((char*)"==test2 write 5 entries==", pCache); + + SyncIndex index; + index = 1; + SSyncRaftEntry* pEntry; + code = raftCacheGetEntry(pCache, index, &pEntry); + ASSERT(code == 0); + syncEntryDestory(pEntry); + syncEntryLog2((char*)"==test2 get entry 1==", pEntry); + + index = 2; + code = raftCacheGetEntryP(pCache, index, &pEntry); + ASSERT(code == 0); + syncEntryLog2((char*)"==test2 get entry pointer 2==", pEntry); + + // not found + index = 8; + code = raftCacheGetEntry(pCache, index, &pEntry); + ASSERT(code == -1 && terrno == TSDB_CODE_WAL_LOG_NOT_EXIST); + sTrace("==test2 get entry 8 not found=="); + + // not found + index = 9; + code = raftCacheGetEntryP(pCache, index, &pEntry); + ASSERT(code == -1 && terrno == TSDB_CODE_WAL_LOG_NOT_EXIST); + sTrace("==test2 get entry pointer 9 not found=="); +} + +void test3() { + int32_t code = 0; + SRaftEntryHashCache* pCache = createCache(5); + for (int i = 0; i < 5; ++i) { + SSyncRaftEntry* pEntry = createEntry(i); + code = raftCachePutEntry(pCache, pEntry); + ASSERT(code == 1); + syncEntryDestory(pEntry); + } + for (int i = 6; i < 10; ++i) { + SSyncRaftEntry* pEntry = createEntry(i); + code = raftCachePutEntry(pCache, pEntry); + ASSERT(code == 0); + syncEntryDestory(pEntry); + } + raftCacheLog2((char*)"==test3 write 10 entries, max count is 5==", pCache); +} + +void test4() { + int32_t code = 0; + SRaftEntryHashCache* pCache = createCache(5); + for (int i = 0; i < 5; ++i) { + SSyncRaftEntry* pEntry = createEntry(i); + code = raftCachePutEntry(pCache, pEntry); + ASSERT(code == 1); + syncEntryDestory(pEntry); + } + raftCacheLog2((char*)"==test4 write 5 entries==", pCache); + + SyncIndex index; + index = 3; + SSyncRaftEntry* pEntry; + code = raftCacheGetAndDel(pCache, index, &pEntry); + ASSERT(code == 0); + syncEntryLog2((char*)"==test4 get-and-del entry 3==", pEntry); + raftCacheLog2((char*)"==test4 after get-and-del entry 3==", pCache); +} + +static char* keyFn(const void* pData) { + SSyncRaftEntry* pEntry = (SSyncRaftEntry*)pData; + return (char*)(&(pEntry->index)); +} + +static int cmpFn(const void* p1, const void* p2) { return memcmp(p1, p2, sizeof(SyncIndex)); } + +void printSkipList(SSkipList* pSkipList) { + ASSERT(pSkipList != NULL); + + SSkipListIterator* pIter = tSkipListCreateIter(pSkipList); + while (tSkipListIterNext(pIter)) { + SSkipListNode* pNode = tSkipListIterGet(pIter); + ASSERT(pNode != NULL); + SSyncRaftEntry* pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(pNode); + syncEntryPrint2((char*)"", pEntry); + } +} + +void delSkipListFirst(SSkipList* pSkipList, int n) { + ASSERT(pSkipList != NULL); + + sTrace("delete first %d -------------", n); + SSkipListIterator* pIter = tSkipListCreateIter(pSkipList); + for (int i = 0; i < n; ++i) { + tSkipListIterNext(pIter); + SSkipListNode* pNode = tSkipListIterGet(pIter); + tSkipListRemoveNode(pSkipList, pNode); + } +} + +SSyncRaftEntry* getLogEntry2(SSkipList* pSkipList, SyncIndex index) { + SyncIndex index2 = index; + SSyncRaftEntry* pEntry = NULL; + int arraySize = 0; + + SArray* entryPArray = tSkipListGet(pSkipList, (char*)(&index2)); + arraySize = taosArrayGetSize(entryPArray); + if (arraySize > 0) { + SSkipListNode** ppNode = (SSkipListNode**)taosArrayGet(entryPArray, 0); + ASSERT(*ppNode != NULL); + pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(*ppNode); + } + taosArrayDestroy(entryPArray); + + sTrace("get index2: %ld, arraySize:%d -------------", index, arraySize); + syncEntryLog2((char*)"getLogEntry2", pEntry); + return pEntry; +} + +SSyncRaftEntry* getLogEntry(SSkipList* pSkipList, SyncIndex index) { + sTrace("get index: %ld -------------", index); + SyncIndex index2 = index; + SSyncRaftEntry* pEntry = NULL; + SSkipListIterator* pIter = + tSkipListCreateIterFromVal(pSkipList, (const char*)&index2, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC); + if (tSkipListIterNext(pIter)) { + SSkipListNode* pNode = tSkipListIterGet(pIter); + ASSERT(pNode != NULL); + pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(pNode); + } + + syncEntryLog2((char*)"getLogEntry", pEntry); + return pEntry; +} + +void test5() { + SSkipList* pSkipList = + tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_BINARY, sizeof(SyncIndex), cmpFn, SL_ALLOW_DUP_KEY, keyFn); + ASSERT(pSkipList != NULL); + + sTrace("insert 9 - 5"); + for (int i = 9; i >= 5; --i) { + SSyncRaftEntry* pEntry = createEntry(i); + SSkipListNode* pSkipListNode = tSkipListPut(pSkipList, pEntry); + } + + sTrace("insert 0 - 4"); + for (int i = 0; i <= 4; ++i) { + SSyncRaftEntry* pEntry = createEntry(i); + SSkipListNode* pSkipListNode = tSkipListPut(pSkipList, pEntry); + } + + sTrace("insert 7 7 7 7 7"); + for (int i = 0; i <= 4; ++i) { + SSyncRaftEntry* pEntry = createEntry(7); + SSkipListNode* pSkipListNode = tSkipListPut(pSkipList, pEntry); + } + + sTrace("print: -------------"); + printSkipList(pSkipList); + + delSkipListFirst(pSkipList, 3); + + sTrace("print: -------------"); + printSkipList(pSkipList); + + getLogEntry(pSkipList, 2); + getLogEntry(pSkipList, 5); + getLogEntry(pSkipList, 7); + getLogEntry(pSkipList, 7); + + getLogEntry2(pSkipList, 2); + getLogEntry2(pSkipList, 5); + getLogEntry2(pSkipList, 7); + getLogEntry2(pSkipList, 7); + + tSkipListDestroy(pSkipList); +} + +int main(int argc, char** argv) { + gRaftDetailLog = true; + tsAsyncLog = 0; + sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE + DEBUG_DEBUG; + + /* + test1(); + test2(); + test3(); + test4(); + */ + + test5(); + + return 0; +}