diff --git a/source/libs/sync/inc/syncRaftEntry.h b/source/libs/sync/inc/syncRaftEntry.h index 607ff4ba24cae3ef1985e84a58bb0de44d8fbec4..bab1dcc661a90c0d9747d37699d64b747526495d 100644 --- a/source/libs/sync/inc/syncRaftEntry.h +++ b/source/libs/sync/inc/syncRaftEntry.h @@ -26,6 +26,7 @@ extern "C" { #include "syncInt.h" #include "syncMessage.h" #include "taosdef.h" +#include "tref.h" #include "tskiplist.h" typedef struct SSyncRaftEntry { @@ -89,6 +90,7 @@ typedef struct SRaftEntryCache { SSkipList* pSkipList; int32_t maxCount; int32_t currentCount; + int32_t refMgr; TdThreadMutex mutex; SSyncNode* pSyncNode; } SRaftEntryCache; diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index d9f11ba80f925ad2b73e443169968a54d10209f3..b00be6edb6a6b9473ea1483a4bb508033151f313 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -242,9 +242,9 @@ static int32_t syncIOStopInternal(SSyncIO *io) { } static void *syncIOConsumerFunc(void *param) { - SSyncIO *io = param; + SSyncIO * io = param; STaosQall *qall = taosAllocateQall(); - SRpcMsg *pRpcMsg, rpcMsg; + SRpcMsg * pRpcMsg, rpcMsg; SQueueInfo qinfo = {0}; while (1) { diff --git a/source/libs/sync/src/syncIndexMgr.c b/source/libs/sync/src/syncIndexMgr.c index 39bede23f6c65abb71169ea82cf0647803275642..8634676f8686d5ac0249855ca00db5f45b59cd4e 100644 --- a/source/libs/sync/src/syncIndexMgr.c +++ b/source/libs/sync/src/syncIndexMgr.c @@ -125,7 +125,7 @@ cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) { char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr) { cJSON *pJson = syncIndexMgr2Json(pSyncIndexMgr); - char *serialized = cJSON_Print(pJson); + char * serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } diff --git a/source/libs/sync/src/syncRaftCfg.c b/source/libs/sync/src/syncRaftCfg.c index c634a1bf4995cb161ac96b772d3fc0acc0da8b37..56666b35b621d48d99c5a55063bd67c053d59776 100644 --- a/source/libs/sync/src/syncRaftCfg.c +++ b/source/libs/sync/src/syncRaftCfg.c @@ -101,7 +101,7 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) { char *syncCfg2Str(SSyncCfg *pSyncCfg) { cJSON *pJson = syncCfg2Json(pSyncCfg); - char *serialized = cJSON_Print(pJson); + char * serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } @@ -109,7 +109,7 @@ char *syncCfg2Str(SSyncCfg *pSyncCfg) { char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg) { if (pSyncCfg != NULL) { int32_t len = 512; - char *s = taosMemoryMalloc(len); + char * s = taosMemoryMalloc(len); memset(s, 0, len); snprintf(s, len, "{r-num:%d, my:%d, ", pSyncCfg->replicaNum, pSyncCfg->myIndex); @@ -206,7 +206,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) { char *raftCfg2Str(SRaftCfg *pRaftCfg) { cJSON *pJson = raftCfg2Json(pRaftCfg); - char *serialized = cJSON_Print(pJson); + char * serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } @@ -285,7 +285,7 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) { (pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring); } - cJSON *pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg"); + cJSON * pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg"); int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg)); ASSERT(code == 0); diff --git a/source/libs/sync/src/syncRaftEntry.c b/source/libs/sync/src/syncRaftEntry.c index 64cff4d93d710a600ca2a2bf34d329f296477fa9..4687fc41c4c9b783fc9d69c17a8a273d305746bb 100644 --- a/source/libs/sync/src/syncRaftEntry.c +++ b/source/libs/sync/src/syncRaftEntry.c @@ -23,6 +23,7 @@ SSyncRaftEntry* syncEntryBuild(uint32_t dataLen) { memset(pEntry, 0, bytes); pEntry->bytes = bytes; pEntry->dataLen = dataLen; + pEntry->rid = -1; return pEntry; } @@ -451,6 +452,11 @@ static char* keyFn(const void* pData) { static int cmpFn(const void* p1, const void* p2) { return memcmp(p1, p2, sizeof(SyncIndex)); } +static void freeRaftEntry(void* param) { + SSyncRaftEntry* pEntry = (SSyncRaftEntry*)param; + syncEntryDestory(pEntry); +} + SRaftEntryCache* raftEntryCacheCreate(SSyncNode* pSyncNode, int32_t maxCount) { SRaftEntryCache* pCache = taosMemoryMalloc(sizeof(SRaftEntryCache)); if (pCache == NULL) { @@ -466,6 +472,7 @@ SRaftEntryCache* raftEntryCacheCreate(SSyncNode* pSyncNode, int32_t maxCount) { } taosThreadMutexInit(&(pCache->mutex), NULL); + pCache->refMgr = taosOpenRef(10, freeRaftEntry); pCache->maxCount = maxCount; pCache->currentCount = 0; pCache->pSyncNode = pSyncNode; @@ -477,6 +484,10 @@ void raftEntryCacheDestroy(SRaftEntryCache* pCache) { if (pCache != NULL) { taosThreadMutexLock(&(pCache->mutex)); tSkipListDestroy(pCache->pSkipList); + if (pCache->refMgr != -1) { + taosCloseRef(pCache->refMgr); + pCache->refMgr = -1; + } taosThreadMutexUnlock(&(pCache->mutex)); taosThreadMutexDestroy(&(pCache->mutex)); taosMemoryFree(pCache); @@ -498,6 +509,9 @@ int32_t raftEntryCachePutEntry(struct SRaftEntryCache* pCache, SSyncRaftEntry* p ASSERT(pSkipListNode != NULL); ++(pCache->currentCount); + pEntry->rid = taosAddRef(pCache->refMgr, pEntry); + ASSERT(pEntry->rid >= 0); + do { char eventLog[128]; snprintf(eventLog, sizeof(eventLog), "raft cache add, type:%s,%d, type2:%s,%d, index:%" PRId64 ", bytes:%d", @@ -520,6 +534,7 @@ int32_t raftEntryCacheGetEntry(struct SRaftEntryCache* pCache, SyncIndex index, if (code == 1) { *ppEntry = taosMemoryMalloc(pEntry->bytes); memcpy(*ppEntry, pEntry, pEntry->bytes); + (*ppEntry)->rid = -1; } else { *ppEntry = NULL; } @@ -541,6 +556,7 @@ int32_t raftEntryCacheGetEntryP(struct SRaftEntryCache* pCache, SyncIndex index, SSkipListNode** ppNode = (SSkipListNode**)taosArrayGet(entryPArray, 0); ASSERT(*ppNode != NULL); *ppEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(*ppNode); + taosAcquireRef(pCache->refMgr, (*ppEntry)->rid); code = 1; } else if (arraySize == 0) { @@ -600,7 +616,9 @@ int32_t raftEntryCacheClear(struct SRaftEntryCache* pCache, int32_t count) { taosArrayPush(delNodeArray, &pNode); ++returnCnt; SSyncRaftEntry* pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(pNode); - syncEntryDestory(pEntry); + + // syncEntryDestory(pEntry); + taosRemoveRef(pCache->refMgr, pEntry->rid); } tSkipListDestroyIter(pIter); diff --git a/source/libs/sync/src/syncRaftStore.c b/source/libs/sync/src/syncRaftStore.c index fbfeb031f63c244d0d5a7c9cd689815d2e0f9fba..908587303624a305e03fcd7034417394eb9e71c0 100644 --- a/source/libs/sync/src/syncRaftStore.c +++ b/source/libs/sync/src/syncRaftStore.c @@ -216,7 +216,7 @@ cJSON *raftStore2Json(SRaftStore *pRaftStore) { char *raftStore2Str(SRaftStore *pRaftStore) { cJSON *pJson = raftStore2Json(pRaftStore); - char *serialized = cJSON_Print(pJson); + char * serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } diff --git a/source/libs/sync/src/syncRespMgr.c b/source/libs/sync/src/syncRespMgr.c index 3c63b7692a95f7a91370d4cead683919dba65d2f..501f1fb4354d1230d7d13e9de695579126a6407e 100644 --- a/source/libs/sync/src/syncRespMgr.c +++ b/source/libs/sync/src/syncRespMgr.c @@ -129,7 +129,7 @@ void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl) { while (pStub) { size_t len; - void *key = taosHashGetKey(pStub, &len); + void * key = taosHashGetKey(pStub, &len); uint64_t *pSeqNum = (uint64_t *)key; sum++; diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 279a70cb19a362bb1350fa1df8cabc104bce5870..702e9f01dcb250b8c07c776865f8f274eff89f20 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -374,14 +374,14 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) { char *snapshotSender2Str(SSyncSnapshotSender *pSender) { cJSON *pJson = snapshotSender2Json(pSender); - char *serialized = cJSON_Print(pJson); + char * serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) { int32_t len = 256; - char *s = taosMemoryMalloc(len); + char * s = taosMemoryMalloc(len); SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; char host[64]; @@ -653,7 +653,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { cJSON_AddStringToObject(pFromId, "addr", u64buf); { uint64_t u64 = pReceiver->fromId.addr; - cJSON *pTmp = pFromId; + cJSON * pTmp = pFromId; char host[128] = {0}; uint16_t port; syncUtilU642Addr(u64, host, sizeof(host), &port); @@ -686,14 +686,14 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { cJSON *pJson = snapshotReceiver2Json(pReceiver); - char *serialized = cJSON_Print(pJson); + char * serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) { int32_t len = 256; - char *s = taosMemoryMalloc(len); + char * s = taosMemoryMalloc(len); SRaftId fromId = pReceiver->fromId; char host[128]; diff --git a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp index 8c6c68bbf820bd9bf7cead1cf1e1fe63ed030b40..714d233984072890a269b66f0062ae21e1f539e2 100644 --- a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp +++ b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp @@ -125,7 +125,7 @@ int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void* pParam, void** ppWriter) return 0; } -int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply, SSnapshot *pSnapshot) { +int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply, SSnapshot* pSnapshot) { char logBuf[256] = {0}; snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStopWrite== pFsm:%p, pWriter:%p, isApply:%d", pFsm, pWriter, isApply); diff --git a/source/libs/sync/test/syncEntryCacheTest.cpp b/source/libs/sync/test/syncEntryCacheTest.cpp index 1b3d68f95922086f2578f1956513a9f46a7a731b..fd0c41cce9c8ed1d37abe000e5404926f7729fc1 100644 --- a/source/libs/sync/test/syncEntryCacheTest.cpp +++ b/source/libs/sync/test/syncEntryCacheTest.cpp @@ -5,8 +5,8 @@ #include "syncRaftLog.h" #include "syncRaftStore.h" #include "syncUtil.h" -#include "tskiplist.h" #include "tref.h" +#include "tskiplist.h" void logTest() { sTrace("--- sync log test: trace"); @@ -51,7 +51,7 @@ SRaftEntryCache* createCache(int maxCount) { } void test1() { - int32_t code = 0; + int32_t code = 0; SRaftEntryCache* pCache = createCache(5); for (int i = 0; i < 10; ++i) { SSyncRaftEntry* pEntry = createEntry(i); @@ -68,7 +68,7 @@ void test1() { } void test2() { - int32_t code = 0; + int32_t code = 0; SRaftEntryCache* pCache = createCache(5); for (int i = 0; i < 10; ++i) { SSyncRaftEntry* pEntry = createEntry(i); @@ -77,7 +77,7 @@ void test2() { } raftEntryCacheLog2((char*)"==test1 write 5 entries==", pCache); - SyncIndex index = 2; + SyncIndex index = 2; SSyncRaftEntry* pEntry = NULL; code = raftEntryCacheGetEntryP(pCache, index, &pEntry); @@ -107,7 +107,7 @@ void test2() { } void test3() { - int32_t code = 0; + int32_t code = 0; SRaftEntryCache* pCache = createCache(20); for (int i = 0; i <= 4; ++i) { SSyncRaftEntry* pEntry = createEntry(i); @@ -122,8 +122,6 @@ void test3() { raftEntryCacheLog2((char*)"==test3 write 10 entries==", pCache); } - - static void freeObj(void* param) { SSyncRaftEntry* pEntry = (SSyncRaftEntry*)param; syncEntryLog2((char*)"freeObj: ", pEntry); @@ -138,19 +136,41 @@ void test4() { int64_t rid = taosAddRef(testRefId, pEntry); sTrace("rid: %ld", rid); - + do { SSyncRaftEntry* pAcquireEntry = (SSyncRaftEntry*)taosAcquireRef(testRefId, rid); syncEntryLog2((char*)"acquire: ", pAcquireEntry); + taosAcquireRef(testRefId, rid); taosAcquireRef(testRefId, rid); taosAcquireRef(testRefId, rid); - taosReleaseRef(testRefId, rid); - //taosReleaseRef(testRefId, rid); + // taosReleaseRef(testRefId, rid); + // taosReleaseRef(testRefId, rid); } while (0); taosRemoveRef(testRefId, rid); + + for (int i = 0; i < 10; ++i) { + sTrace("taosReleaseRef, %d", i); + taosReleaseRef(testRefId, rid); + } +} + +void test5() { + int32_t testRefId = taosOpenRef(5, freeObj); + for (int i = 0; i < 100; i++) { + SSyncRaftEntry* pEntry = createEntry(i); + ASSERT(pEntry != NULL); + + int64_t rid = taosAddRef(testRefId, pEntry); + sTrace("rid: %ld", rid); + } + + for (int64_t rid = 2; rid < 101; rid++) { + SSyncRaftEntry* pAcquireEntry = (SSyncRaftEntry*)taosAcquireRef(testRefId, rid); + syncEntryLog2((char*)"taosAcquireRef: ", pAcquireEntry); + } } int main(int argc, char** argv) { @@ -158,11 +178,13 @@ int main(int argc, char** argv) { tsAsyncLog = 0; sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE + DEBUG_DEBUG; - test1(); - test2(); - test3(); - - //test4(); + /* + test1(); + test2(); + test3(); + */ + test4(); + // test5(); return 0; } diff --git a/source/libs/sync/test/syncSnapshotReceiverTest.cpp b/source/libs/sync/test/syncSnapshotReceiverTest.cpp index b744843b1e859bdc34a0bb9706782d9c243c5314..0f8e76f12167d71ea6c5919dbff5e567033b580a 100644 --- a/source/libs/sync/test/syncSnapshotReceiverTest.cpp +++ b/source/libs/sync/test/syncSnapshotReceiverTest.cpp @@ -30,7 +30,7 @@ int32_t SnapshotStopRead(struct SSyncFSM* pFsm, void* pReader) { return 0; } int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len) { return 0; } int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void* pParam, void** ppWriter) { return 0; } -int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply, SSnapshot *pSnapshot) { return 0; } +int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply, SSnapshot* pSnapshot) { return 0; } int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len) { return 0; } SSyncSnapshotReceiver* createReceiver() { diff --git a/source/libs/sync/test/syncTestTool.cpp b/source/libs/sync/test/syncTestTool.cpp index b0a561cb89060ccf0bae93b9803bfd64ba827cf8..9e9769224f18ac29297e8936e7f93fab817612d5 100644 --- a/source/libs/sync/test/syncTestTool.cpp +++ b/source/libs/sync/test/syncTestTool.cpp @@ -126,7 +126,7 @@ int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void* pParam, void** ppWriter) return 0; } -int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply, SSnapshot *pSnapshot) { +int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply, SSnapshot* pSnapshot) { if (isApply) { gSnapshotLastApplyIndex = gFinishLastApplyIndex; gSnapshotLastApplyTerm = gFinishLastApplyTerm; diff --git a/tools/taos-tools b/tools/taos-tools index 0b8a3373bb7548f8106d13e7d3b0a988d3c4d48a..817cb6ac431ed8ae4c843872cdfc8c201c1e1894 160000 --- a/tools/taos-tools +++ b/tools/taos-tools @@ -1 +1 @@ -Subproject commit 0b8a3373bb7548f8106d13e7d3b0a988d3c4d48a +Subproject commit 817cb6ac431ed8ae4c843872cdfc8c201c1e1894