提交 f556d981 编写于 作者: M Minghao Li

sync refactor

上级 c9c48e87
...@@ -120,11 +120,11 @@ typedef struct SSyncNode { ...@@ -120,11 +120,11 @@ typedef struct SSyncNode {
int32_t (*FpEqMsg)(void* queue, SRpcMsg* pMsg); int32_t (*FpEqMsg)(void* queue, SRpcMsg* pMsg);
// init internal // init internal
SNodeInfo me; SNodeInfo myNodeInfo;
SRaftId raftId; SRaftId myRaftId;
int32_t peersNum; int32_t peersNum;
SNodeInfo peers[TSDB_MAX_REPLICA]; SNodeInfo peersNodeInfo[TSDB_MAX_REPLICA];
SRaftId peersId[TSDB_MAX_REPLICA]; SRaftId peersId[TSDB_MAX_REPLICA];
int32_t replicaNum; int32_t replicaNum;
......
...@@ -85,20 +85,20 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { ...@@ -85,20 +85,20 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
pSyncNode->FpEqMsg = pSyncInfo->FpEqMsg; pSyncNode->FpEqMsg = pSyncInfo->FpEqMsg;
// init internal // init internal
pSyncNode->me = pSyncInfo->syncCfg.nodeInfo[pSyncInfo->syncCfg.myIndex]; pSyncNode->myNodeInfo = pSyncInfo->syncCfg.nodeInfo[pSyncInfo->syncCfg.myIndex];
syncUtilnodeInfo2raftId(&pSyncNode->me, pSyncInfo->vgId, &pSyncNode->raftId); syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncInfo->vgId, &pSyncNode->myRaftId);
// init peersNum, peers, peersId // init peersNum, peers, peersId
pSyncNode->peersNum = pSyncInfo->syncCfg.replicaNum - 1; pSyncNode->peersNum = pSyncInfo->syncCfg.replicaNum - 1;
int j = 0; int j = 0;
for (int i = 0; i < pSyncInfo->syncCfg.replicaNum; ++i) { for (int i = 0; i < pSyncInfo->syncCfg.replicaNum; ++i) {
if (i != pSyncInfo->syncCfg.myIndex) { if (i != pSyncInfo->syncCfg.myIndex) {
pSyncNode->peers[j] = pSyncInfo->syncCfg.nodeInfo[i]; pSyncNode->peersNodeInfo[j] = pSyncInfo->syncCfg.nodeInfo[i];
j++; j++;
} }
} }
for (int i = 0; i < pSyncNode->peersNum; ++i) { for (int i = 0; i < pSyncNode->peersNum; ++i) {
syncUtilnodeInfo2raftId(&pSyncNode->peers[i], pSyncInfo->vgId, &pSyncNode->peersId[i]); syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncInfo->vgId, &pSyncNode->peersId[i]);
} }
// init replicaNum, replicasId // init replicaNum, replicasId
...@@ -190,16 +190,16 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) { ...@@ -190,16 +190,16 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
cJSON_AddStringToObject(pRoot, "FpEqMsg", u64buf); cJSON_AddStringToObject(pRoot, "FpEqMsg", u64buf);
// init internal // init internal
cJSON* pMe = syncUtilNodeInfo2Json(&pSyncNode->me); cJSON* pMe = syncUtilNodeInfo2Json(&pSyncNode->myNodeInfo);
cJSON_AddItemToObject(pRoot, "me", pMe); cJSON_AddItemToObject(pRoot, "myNodeInfo", pMe);
cJSON* pRaftId = syncUtilRaftId2Json(&pSyncNode->raftId); cJSON* pRaftId = syncUtilRaftId2Json(&pSyncNode->myRaftId);
cJSON_AddItemToObject(pRoot, "raftId", pRaftId); cJSON_AddItemToObject(pRoot, "myRaftId", pRaftId);
cJSON_AddNumberToObject(pRoot, "peersNum", pSyncNode->peersNum); cJSON_AddNumberToObject(pRoot, "peersNum", pSyncNode->peersNum);
cJSON* pPeers = cJSON_CreateArray(); cJSON* pPeers = cJSON_CreateArray();
cJSON_AddItemToObject(pRoot, "peers", pPeers); cJSON_AddItemToObject(pRoot, "peersNodeInfo", pPeers);
for (int i = 0; i < pSyncNode->peersNum; ++i) { for (int i = 0; i < pSyncNode->peersNum; ++i) {
cJSON_AddItemToArray(pPeers, syncUtilNodeInfo2Json(&pSyncNode->peers[i])); cJSON_AddItemToArray(pPeers, syncUtilNodeInfo2Json(&pSyncNode->peersNodeInfo[i]));
} }
cJSON* pPeersId = cJSON_CreateArray(); cJSON* pPeersId = cJSON_CreateArray();
cJSON_AddItemToObject(pRoot, "peersId", pPeersId); cJSON_AddItemToObject(pRoot, "peersId", pPeersId);
...@@ -222,7 +222,8 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) { ...@@ -222,7 +222,8 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
cJSON_AddItemToObject(pRoot, "leaderCache", pLaderCache); cJSON_AddItemToObject(pRoot, "leaderCache", pLaderCache);
// tla+ server vars // tla+ server vars
cJSON_AddStringToObject(pRoot, "state", syncUtilState2String(pSyncNode->state)); cJSON_AddNumberToObject(pRoot, "state", pSyncNode->state);
cJSON_AddStringToObject(pRoot, "state_str", syncUtilState2String(pSyncNode->state));
// tla+ candidate vars // tla+ candidate vars
...@@ -283,7 +284,7 @@ int32_t syncNodePingAll(SSyncNode* pSyncNode) { ...@@ -283,7 +284,7 @@ int32_t syncNodePingAll(SSyncNode* pSyncNode) {
for (int i = 0; i < pSyncNode->syncCfg.replicaNum; ++i) { for (int i = 0; i < pSyncNode->syncCfg.replicaNum; ++i) {
SRaftId destId; SRaftId destId;
syncUtilnodeInfo2raftId(&pSyncNode->syncCfg.nodeInfo[i], pSyncNode->vgId, &destId); syncUtilnodeInfo2raftId(&pSyncNode->syncCfg.nodeInfo[i], pSyncNode->vgId, &destId);
SyncPing* pMsg = syncPingBuild3(&pSyncNode->raftId, &destId); SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, &destId);
ret = syncNodePing(pSyncNode, &destId, pMsg); ret = syncNodePing(pSyncNode, &destId, pMsg);
assert(ret == 0); assert(ret == 0);
syncPingDestroy(pMsg); syncPingDestroy(pMsg);
...@@ -294,8 +295,8 @@ int32_t syncNodePingPeers(SSyncNode* pSyncNode) { ...@@ -294,8 +295,8 @@ int32_t syncNodePingPeers(SSyncNode* pSyncNode) {
int32_t ret = 0; int32_t ret = 0;
for (int i = 0; i < pSyncNode->peersNum; ++i) { for (int i = 0; i < pSyncNode->peersNum; ++i) {
SRaftId destId; SRaftId destId;
syncUtilnodeInfo2raftId(&pSyncNode->peers[i], pSyncNode->vgId, &destId); syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &destId);
SyncPing* pMsg = syncPingBuild3(&pSyncNode->raftId, &destId); SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, &destId);
ret = syncNodePing(pSyncNode, &destId, pMsg); ret = syncNodePing(pSyncNode, &destId, pMsg);
assert(ret == 0); assert(ret == 0);
syncPingDestroy(pMsg); syncPingDestroy(pMsg);
...@@ -304,7 +305,7 @@ int32_t syncNodePingPeers(SSyncNode* pSyncNode) { ...@@ -304,7 +305,7 @@ int32_t syncNodePingPeers(SSyncNode* pSyncNode) {
int32_t syncNodePingSelf(SSyncNode* pSyncNode) { int32_t syncNodePingSelf(SSyncNode* pSyncNode) {
int32_t ret; int32_t ret;
SyncPing* pMsg = syncPingBuild3(&pSyncNode->raftId, &pSyncNode->raftId); SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, &pSyncNode->myRaftId);
ret = syncNodePing(pSyncNode, &pMsg->destId, pMsg); ret = syncNodePing(pSyncNode, &pMsg->destId, pMsg);
assert(ret == 0); assert(ret == 0);
syncPingDestroy(pMsg); syncPingDestroy(pMsg);
...@@ -385,7 +386,7 @@ static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) { ...@@ -385,7 +386,7 @@ static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) {
cJSON_Delete(pJson); cJSON_Delete(pJson);
} }
SyncPingReply* pMsgReply = syncPingReplyBuild3(&ths->raftId, &pMsg->srcId); SyncPingReply* pMsgReply = syncPingReplyBuild3(&ths->myRaftId, &pMsg->srcId);
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncPingReply2RpcMsg(pMsgReply, &rpcMsg); syncPingReply2RpcMsg(pMsgReply, &rpcMsg);
syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg); syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
...@@ -485,7 +486,7 @@ static void syncNodeBecomeFollower(SSyncNode* pSyncNode) { ...@@ -485,7 +486,7 @@ static void syncNodeBecomeFollower(SSyncNode* pSyncNode) {
// //
static void syncNodeBecomeLeader(SSyncNode* pSyncNode) { static void syncNodeBecomeLeader(SSyncNode* pSyncNode) {
pSyncNode->state = TAOS_SYNC_STATE_LEADER; pSyncNode->state = TAOS_SYNC_STATE_LEADER;
pSyncNode->leaderCache = pSyncNode->raftId; pSyncNode->leaderCache = pSyncNode->myRaftId;
// next Index +=1 // next Index +=1
// match Index = 0; // match Index = 0;
......
...@@ -84,7 +84,7 @@ int main(int argc, char** argv) { ...@@ -84,7 +84,7 @@ int main(int argc, char** argv) {
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
for (int i = 0; i < 10; ++i) { for (int i = 0; i < 10; ++i) {
SyncPingReply* pSyncMsg = syncPingReplyBuild3(&pSyncNode->raftId, &pSyncNode->raftId); SyncPingReply* pSyncMsg = syncPingReplyBuild3(&pSyncNode->myRaftId, &pSyncNode->myRaftId);
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncPingReply2RpcMsg(pSyncMsg, &rpcMsg); syncPingReply2RpcMsg(pSyncMsg, &rpcMsg);
pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册