提交 cedad48a 编写于 作者: M Minghao Li

refactor(sync): add ref in log entry

上级 fba5850b
......@@ -26,6 +26,7 @@ extern "C" {
#include "syncInt.h"
#include "syncMessage.h"
#include "taosdef.h"
#include "tref.h"
#include "tskiplist.h"
typedef struct SSyncRaftEntry {
......@@ -89,6 +90,7 @@ typedef struct SRaftEntryCache {
SSkipList* pSkipList;
int32_t maxCount;
int32_t currentCount;
int32_t refMgr;
TdThreadMutex mutex;
SSyncNode* pSyncNode;
} SRaftEntryCache;
......
......@@ -242,9 +242,9 @@ static int32_t syncIOStopInternal(SSyncIO *io) {
}
static void *syncIOConsumerFunc(void *param) {
SSyncIO *io = param;
SSyncIO * io = param;
STaosQall *qall = taosAllocateQall();
SRpcMsg *pRpcMsg, rpcMsg;
SRpcMsg * pRpcMsg, rpcMsg;
SQueueInfo qinfo = {0};
while (1) {
......
......@@ -125,7 +125,7 @@ cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) {
char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr) {
cJSON *pJson = syncIndexMgr2Json(pSyncIndexMgr);
char *serialized = cJSON_Print(pJson);
char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
......
......@@ -101,7 +101,7 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) {
char *syncCfg2Str(SSyncCfg *pSyncCfg) {
cJSON *pJson = syncCfg2Json(pSyncCfg);
char *serialized = cJSON_Print(pJson);
char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
......@@ -109,7 +109,7 @@ char *syncCfg2Str(SSyncCfg *pSyncCfg) {
char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg) {
if (pSyncCfg != NULL) {
int32_t len = 512;
char *s = taosMemoryMalloc(len);
char * s = taosMemoryMalloc(len);
memset(s, 0, len);
snprintf(s, len, "{r-num:%d, my:%d, ", pSyncCfg->replicaNum, pSyncCfg->myIndex);
......@@ -206,7 +206,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) {
char *raftCfg2Str(SRaftCfg *pRaftCfg) {
cJSON *pJson = raftCfg2Json(pRaftCfg);
char *serialized = cJSON_Print(pJson);
char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
......@@ -285,7 +285,7 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) {
(pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring);
}
cJSON *pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg");
cJSON * pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg");
int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg));
ASSERT(code == 0);
......
......@@ -23,6 +23,7 @@ SSyncRaftEntry* syncEntryBuild(uint32_t dataLen) {
memset(pEntry, 0, bytes);
pEntry->bytes = bytes;
pEntry->dataLen = dataLen;
pEntry->rid = -1;
return pEntry;
}
......@@ -451,6 +452,11 @@ static char* keyFn(const void* pData) {
static int cmpFn(const void* p1, const void* p2) { return memcmp(p1, p2, sizeof(SyncIndex)); }
static void freeRaftEntry(void* param) {
SSyncRaftEntry* pEntry = (SSyncRaftEntry*)param;
syncEntryDestory(pEntry);
}
SRaftEntryCache* raftEntryCacheCreate(SSyncNode* pSyncNode, int32_t maxCount) {
SRaftEntryCache* pCache = taosMemoryMalloc(sizeof(SRaftEntryCache));
if (pCache == NULL) {
......@@ -466,6 +472,7 @@ SRaftEntryCache* raftEntryCacheCreate(SSyncNode* pSyncNode, int32_t maxCount) {
}
taosThreadMutexInit(&(pCache->mutex), NULL);
pCache->refMgr = taosOpenRef(10, freeRaftEntry);
pCache->maxCount = maxCount;
pCache->currentCount = 0;
pCache->pSyncNode = pSyncNode;
......@@ -477,6 +484,10 @@ void raftEntryCacheDestroy(SRaftEntryCache* pCache) {
if (pCache != NULL) {
taosThreadMutexLock(&(pCache->mutex));
tSkipListDestroy(pCache->pSkipList);
if (pCache->refMgr != -1) {
taosCloseRef(pCache->refMgr);
pCache->refMgr = -1;
}
taosThreadMutexUnlock(&(pCache->mutex));
taosThreadMutexDestroy(&(pCache->mutex));
taosMemoryFree(pCache);
......@@ -498,6 +509,9 @@ int32_t raftEntryCachePutEntry(struct SRaftEntryCache* pCache, SSyncRaftEntry* p
ASSERT(pSkipListNode != NULL);
++(pCache->currentCount);
pEntry->rid = taosAddRef(pCache->refMgr, pEntry);
ASSERT(pEntry->rid >= 0);
do {
char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "raft cache add, type:%s,%d, type2:%s,%d, index:%" PRId64 ", bytes:%d",
......@@ -520,6 +534,7 @@ int32_t raftEntryCacheGetEntry(struct SRaftEntryCache* pCache, SyncIndex index,
if (code == 1) {
*ppEntry = taosMemoryMalloc(pEntry->bytes);
memcpy(*ppEntry, pEntry, pEntry->bytes);
(*ppEntry)->rid = -1;
} else {
*ppEntry = NULL;
}
......@@ -541,6 +556,7 @@ int32_t raftEntryCacheGetEntryP(struct SRaftEntryCache* pCache, SyncIndex index,
SSkipListNode** ppNode = (SSkipListNode**)taosArrayGet(entryPArray, 0);
ASSERT(*ppNode != NULL);
*ppEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(*ppNode);
taosAcquireRef(pCache->refMgr, (*ppEntry)->rid);
code = 1;
} else if (arraySize == 0) {
......@@ -600,7 +616,9 @@ int32_t raftEntryCacheClear(struct SRaftEntryCache* pCache, int32_t count) {
taosArrayPush(delNodeArray, &pNode);
++returnCnt;
SSyncRaftEntry* pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(pNode);
syncEntryDestory(pEntry);
// syncEntryDestory(pEntry);
taosRemoveRef(pCache->refMgr, pEntry->rid);
}
tSkipListDestroyIter(pIter);
......
......@@ -216,7 +216,7 @@ cJSON *raftStore2Json(SRaftStore *pRaftStore) {
char *raftStore2Str(SRaftStore *pRaftStore) {
cJSON *pJson = raftStore2Json(pRaftStore);
char *serialized = cJSON_Print(pJson);
char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
......
......@@ -129,7 +129,7 @@ void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl) {
while (pStub) {
size_t len;
void *key = taosHashGetKey(pStub, &len);
void * key = taosHashGetKey(pStub, &len);
uint64_t *pSeqNum = (uint64_t *)key;
sum++;
......
......@@ -374,14 +374,14 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
cJSON *pJson = snapshotSender2Json(pSender);
char *serialized = cJSON_Print(pJson);
char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) {
int32_t len = 256;
char *s = taosMemoryMalloc(len);
char * s = taosMemoryMalloc(len);
SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
char host[64];
......@@ -653,7 +653,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
cJSON_AddStringToObject(pFromId, "addr", u64buf);
{
uint64_t u64 = pReceiver->fromId.addr;
cJSON *pTmp = pFromId;
cJSON * pTmp = pFromId;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
......@@ -686,14 +686,14 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
cJSON *pJson = snapshotReceiver2Json(pReceiver);
char *serialized = cJSON_Print(pJson);
char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) {
int32_t len = 256;
char *s = taosMemoryMalloc(len);
char * s = taosMemoryMalloc(len);
SRaftId fromId = pReceiver->fromId;
char host[128];
......
......@@ -125,7 +125,7 @@ int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void* pParam, void** ppWriter)
return 0;
}
int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply, SSnapshot *pSnapshot) {
int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply, SSnapshot* pSnapshot) {
char logBuf[256] = {0};
snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStopWrite== pFsm:%p, pWriter:%p, isApply:%d", pFsm, pWriter,
isApply);
......
......@@ -5,8 +5,8 @@
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "tskiplist.h"
#include "tref.h"
#include "tskiplist.h"
void logTest() {
sTrace("--- sync log test: trace");
......@@ -51,7 +51,7 @@ SRaftEntryCache* createCache(int maxCount) {
}
void test1() {
int32_t code = 0;
int32_t code = 0;
SRaftEntryCache* pCache = createCache(5);
for (int i = 0; i < 10; ++i) {
SSyncRaftEntry* pEntry = createEntry(i);
......@@ -68,7 +68,7 @@ void test1() {
}
void test2() {
int32_t code = 0;
int32_t code = 0;
SRaftEntryCache* pCache = createCache(5);
for (int i = 0; i < 10; ++i) {
SSyncRaftEntry* pEntry = createEntry(i);
......@@ -77,7 +77,7 @@ void test2() {
}
raftEntryCacheLog2((char*)"==test1 write 5 entries==", pCache);
SyncIndex index = 2;
SyncIndex index = 2;
SSyncRaftEntry* pEntry = NULL;
code = raftEntryCacheGetEntryP(pCache, index, &pEntry);
......@@ -107,7 +107,7 @@ void test2() {
}
void test3() {
int32_t code = 0;
int32_t code = 0;
SRaftEntryCache* pCache = createCache(20);
for (int i = 0; i <= 4; ++i) {
SSyncRaftEntry* pEntry = createEntry(i);
......@@ -122,8 +122,6 @@ void test3() {
raftEntryCacheLog2((char*)"==test3 write 10 entries==", pCache);
}
static void freeObj(void* param) {
SSyncRaftEntry* pEntry = (SSyncRaftEntry*)param;
syncEntryLog2((char*)"freeObj: ", pEntry);
......@@ -138,7 +136,7 @@ void test4() {
int64_t rid = taosAddRef(testRefId, pEntry);
sTrace("rid: %ld", rid);
do {
SSyncRaftEntry* pAcquireEntry = (SSyncRaftEntry*)taosAcquireRef(testRefId, rid);
syncEntryLog2((char*)"acquire: ", pAcquireEntry);
......@@ -147,8 +145,8 @@ void test4() {
taosAcquireRef(testRefId, rid);
taosAcquireRef(testRefId, rid);
//taosReleaseRef(testRefId, rid);
//taosReleaseRef(testRefId, rid);
// taosReleaseRef(testRefId, rid);
// taosReleaseRef(testRefId, rid);
} while (0);
taosRemoveRef(testRefId, rid);
......@@ -180,13 +178,13 @@ int main(int argc, char** argv) {
tsAsyncLog = 0;
sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE + DEBUG_DEBUG;
/*
test1();
test2();
test3();
*/
/*
test1();
test2();
test3();
*/
test4();
//test5();
// test5();
return 0;
}
......@@ -30,7 +30,7 @@ int32_t SnapshotStopRead(struct SSyncFSM* pFsm, void* pReader) { return 0; }
int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len) { return 0; }
int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void* pParam, void** ppWriter) { return 0; }
int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply, SSnapshot *pSnapshot) { return 0; }
int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply, SSnapshot* pSnapshot) { return 0; }
int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len) { return 0; }
SSyncSnapshotReceiver* createReceiver() {
......
......@@ -126,7 +126,7 @@ int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void* pParam, void** ppWriter)
return 0;
}
int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply, SSnapshot *pSnapshot) {
int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply, SSnapshot* pSnapshot) {
if (isApply) {
gSnapshotLastApplyIndex = gFinishLastApplyIndex;
gSnapshotLastApplyTerm = gFinishLastApplyTerm;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册