提交 71296450 编写于 作者: M Minghao Li

sync refactor

上级 82144e35
...@@ -142,7 +142,6 @@ typedef struct SSyncNode { ...@@ -142,7 +142,6 @@ typedef struct SSyncNode {
SRaftId leaderCache; SRaftId leaderCache;
// life cycle // life cycle
int32_t refCount;
int64_t rid; int64_t rid;
// tla+ server vars // tla+ server vars
......
...@@ -31,6 +31,7 @@ ...@@ -31,6 +31,7 @@
#include "syncTimeout.h" #include "syncTimeout.h"
#include "syncUtil.h" #include "syncUtil.h"
#include "syncVoteMgr.h" #include "syncVoteMgr.h"
#include "tref.h"
static int32_t tsNodeRefId = -1; static int32_t tsNodeRefId = -1;
...@@ -44,31 +45,57 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId); ...@@ -44,31 +45,57 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId);
static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg); static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg);
// life cycle
static void syncFreeNode(void* param);
// --------------------------------- // ---------------------------------
int32_t syncInit() { int32_t syncInit() {
int32_t ret = syncEnvStart(); int32_t ret;
tsNodeRefId = taosOpenRef(200, syncFreeNode);
if (tsNodeRefId < 0) {
sError("failed to init node ref");
syncCleanUp();
ret = -1;
} else {
ret = syncEnvStart();
}
return ret; return ret;
} }
void syncCleanUp() { void syncCleanUp() {
int32_t ret = syncEnvStop(); int32_t ret = syncEnvStop();
assert(ret == 0); assert(ret == 0);
if (tsNodeRefId != -1) {
taosCloseRef(tsNodeRefId);
tsNodeRefId = -1;
}
} }
int64_t syncStart(const SSyncInfo* pSyncInfo) { int64_t syncStart(const SSyncInfo* pSyncInfo) {
int32_t ret = 0;
SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo); SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
assert(pSyncNode != NULL); assert(pSyncNode != NULL);
// todo : return ref id pSyncNode->rid = taosAddRef(tsNodeRefId, pSyncNode);
return ret; if (pSyncNode->rid < 0) {
syncFreeNode(pSyncNode);
return -1;
}
return pSyncNode->rid;
} }
void syncStop(int64_t rid) { void syncStop(int64_t rid) {
// todo : get pointer from rid SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
SSyncNode* pSyncNode = NULL; if (pSyncNode == NULL) {
return;
}
syncNodeClose(pSyncNode); syncNodeClose(pSyncNode);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
taosRemoveRef(tsNodeRefId, rid);
} }
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) {
...@@ -155,7 +182,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { ...@@ -155,7 +182,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
pSyncNode->quorum = syncUtilQuorum(pSyncInfo->syncCfg.replicaNum); pSyncNode->quorum = syncUtilQuorum(pSyncInfo->syncCfg.replicaNum);
pSyncNode->leaderCache = EMPTY_RAFT_ID; pSyncNode->leaderCache = EMPTY_RAFT_ID;
// init life cycle // init life cycle outside
// TLA+ Spec // TLA+ Spec
// InitHistoryVars == /\ elections = {} // InitHistoryVars == /\ elections = {}
...@@ -444,6 +471,10 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) { ...@@ -444,6 +471,10 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
cJSON* pLaderCache = syncUtilRaftId2Json(&pSyncNode->leaderCache); cJSON* pLaderCache = syncUtilRaftId2Json(&pSyncNode->leaderCache);
cJSON_AddItemToObject(pRoot, "leaderCache", pLaderCache); cJSON_AddItemToObject(pRoot, "leaderCache", pLaderCache);
// life cycle
snprintf(u64buf, sizeof(u64buf), "%ld", pSyncNode->rid);
cJSON_AddStringToObject(pRoot, "rid", u64buf);
// tla+ server vars // tla+ server vars
cJSON_AddNumberToObject(pRoot, "state", pSyncNode->state); cJSON_AddNumberToObject(pRoot, "state", pSyncNode->state);
cJSON_AddStringToObject(pRoot, "state_str", syncUtilState2String(pSyncNode->state)); cJSON_AddStringToObject(pRoot, "state_str", syncUtilState2String(pSyncNode->state));
...@@ -813,3 +844,10 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg ...@@ -813,3 +844,10 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg
syncEntryDestory(pEntry); syncEntryDestory(pEntry);
return ret; return ret;
} }
static void syncFreeNode(void* param) {
SSyncNode* pNode = param;
syncNodePrint2((char*)"==syncFreeNode==", pNode);
free(pNode);
}
\ No newline at end of file
...@@ -82,7 +82,7 @@ void stop(int64_t rid) { ...@@ -82,7 +82,7 @@ void stop(int64_t rid) {
} }
void *func(void *param) { void *func(void *param) {
int64_t rid = (int64_t)param; int64_t rid = (int64_t)param;
int32_t ms = taosRand() % 10000; int32_t ms = taosRand() % 10000;
taosMsleep(ms); taosMsleep(ms);
...@@ -115,7 +115,7 @@ int main() { ...@@ -115,7 +115,7 @@ int main() {
for (int i = 0; i < 20; ++i) { for (int i = 0; i < 20; ++i) {
pthread_t tid; pthread_t tid;
pthread_create(&tid, NULL, func, (void*)rid); pthread_create(&tid, NULL, func, (void *)rid);
} }
int32_t ms = taosRand() % 10000; int32_t ms = taosRand() % 10000;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册