未验证 提交 412ff481 编写于 作者: L Li Minghao 提交者: GitHub

Merge pull request #10565 from taosdata/feature/3.0_mhli

Feature/3.0 mhli
......@@ -138,6 +138,8 @@ typedef struct SSyncInfo {
void* rpcClient;
int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg);
void* queue;
int32_t (*FpEqMsg)(void* queue, SRpcMsg* pMsg);
} SSyncInfo;
......@@ -147,13 +149,10 @@ typedef struct SSyncNode SSyncNode;
int32_t syncInit();
void syncCleanUp();
int64_t syncStart(const SSyncInfo* pSyncInfo);
void syncStop(int64_t rid);
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg);
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak);
// int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak);
int64_t syncStart(const SSyncInfo* pSyncInfo);
void syncStop(int64_t rid);
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg);
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak);
ESyncState syncGetMyRole(int64_t rid);
void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole);
......
......@@ -26,6 +26,9 @@ extern "C" {
#include "syncInt.h"
#include "taosdef.h"
void syncNodeElect(SSyncNode* pSyncNode);
void syncNodeRequestVotePeers(SSyncNode* pSyncNode);
#ifdef __cplusplus
}
#endif
......
......@@ -49,6 +49,7 @@ typedef struct SSyncIO {
int32_t (*FpOnSyncRequestVoteReply)(SSyncNode *pSyncNode, SyncRequestVoteReply *pMsg);
int32_t (*FpOnSyncAppendEntries)(SSyncNode *pSyncNode, SyncAppendEntries *pMsg);
int32_t (*FpOnSyncAppendEntriesReply)(SSyncNode *pSyncNode, SyncAppendEntriesReply *pMsg);
int32_t (*FpOnSyncTimeout)(SSyncNode *pSyncNode, SyncTimeout *pMsg);
int8_t isStart;
......@@ -58,9 +59,10 @@ extern SSyncIO *gSyncIO;
int32_t syncIOStart(char *host, uint16_t port);
int32_t syncIOStop();
int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg);
int32_t syncIOTickQ();
int32_t syncIOTickPing();
int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg);
int32_t syncIOEqMsg(void *queue, SRpcMsg *pMsg);
#ifdef __cplusplus
}
......
......@@ -69,6 +69,9 @@ extern "C" {
struct SRaft;
typedef struct SRaft SRaft;
struct SyncTimeout;
typedef struct SyncTimeout SyncTimeout;
struct SyncPing;
typedef struct SyncPing SyncPing;
......@@ -111,17 +114,22 @@ typedef struct SSyncNode {
char path[TSDB_FILENAME_LEN];
void* rpcClient;
int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg);
void* queue;
int32_t (*FpEqMsg)(void* queue, SRpcMsg* pMsg);
// init internal
SNodeInfo me;
SRaftId raftId;
int32_t peersNum;
SNodeInfo peers[TSDB_MAX_REPLICA];
SRaftId peersId[TSDB_MAX_REPLICA];
int32_t replicaNum;
SRaftId replicasId[TSDB_MAX_REPLICA];
// raft algorithm
SSyncFSM* pFsm;
SRaftId raftId;
SRaftId peersId[TSDB_MAX_REPLICA];
int32_t replicaNum;
int32_t quorum;
// life cycle
......@@ -147,19 +155,19 @@ typedef struct SSyncNode {
// timer
tmr_h pPingTimer;
int32_t pingTimerMS;
uint8_t pingTimerStart;
uint8_t pingTimerEnable;
TAOS_TMR_CALLBACK FpPingTimer; // Timer Fp
uint64_t pingTimerCounter;
tmr_h pElectTimer;
int32_t electTimerMS;
uint8_t electTimerStart;
uint8_t electTimerEnable;
TAOS_TMR_CALLBACK FpElectTimer; // Timer Fp
uint64_t electTimerCounter;
tmr_h pHeartbeatTimer;
int32_t heartbeatTimerMS;
uint8_t heartbeatTimerStart;
uint8_t heartbeatTimerEnable;
TAOS_TMR_CALLBACK FpHeartbeatTimer; // Timer Fp
uint64_t heartbeatTimerCounter;
......@@ -170,6 +178,7 @@ typedef struct SSyncNode {
int32_t (*FpOnRequestVoteReply)(SSyncNode* ths, SyncRequestVoteReply* pMsg);
int32_t (*FpOnAppendEntries)(SSyncNode* ths, SyncAppendEntries* pMsg);
int32_t (*FpOnAppendEntriesReply)(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
int32_t (*FpOnTimeout)(SSyncNode* pSyncNode, SyncTimeout* pMsg);
} SSyncNode;
......@@ -178,8 +187,24 @@ void syncNodeClose(SSyncNode* pSyncNode);
void syncNodePingAll(SSyncNode* pSyncNode);
void syncNodePingPeers(SSyncNode* pSyncNode);
void syncNodePingSelf(SSyncNode* pSyncNode);
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode);
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode);
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode);
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode, int32_t ms);
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeResetHeartbeatTimer(SSyncNode* pSyncNode, int32_t ms);
void syncNodeBecomeFollower(SSyncNode* pSyncNode);
void syncNodeBecomeLeader(SSyncNode* pSyncNode);
void syncNodeFollower2Candidate(SSyncNode* pSyncNode);
void syncNodeCandidate2Leader(SSyncNode* pSyncNode);
void syncNodeLeader2Follower(SSyncNode* pSyncNode);
void syncNodeCandidate2Follower(SSyncNode* pSyncNode);
#ifdef __cplusplus
}
......
......@@ -30,6 +30,8 @@ extern "C" {
// encode as uint32
typedef enum ESyncMessageType {
SYNC_UNKNOWN = 77,
SYNC_TIMEOUT = 99,
SYNC_PING = 101,
SYNC_PING_REPLY = 103,
SYNC_CLIENT_REQUEST = 105,
......@@ -38,8 +40,37 @@ typedef enum ESyncMessageType {
SYNC_REQUEST_VOTE_REPLY = 111,
SYNC_APPEND_ENTRIES = 113,
SYNC_APPEND_ENTRIES_REPLY = 115,
} ESyncMessageType;
// ---------------------------------------------
cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg);
cJSON* syncRpcUnknownMsg2Json();
// ---------------------------------------------
typedef enum ESyncTimeoutType {
SYNC_TIMEOUT_PING = 100,
SYNC_TIMEOUT_ELECTION,
SYNC_TIMEOUT_HEARTBEAT,
} ESyncTimeoutType;
typedef struct SyncTimeout {
uint32_t bytes;
uint32_t msgType;
ESyncTimeoutType timeoutType;
void* data;
} SyncTimeout;
SyncTimeout* syncTimeoutBuild();
void syncTimeoutDestroy(SyncTimeout* pMsg);
void syncTimeoutSerialize(const SyncTimeout* pMsg, char* buf, uint32_t bufLen);
void syncTimeoutDeserialize(const char* buf, uint32_t len, SyncTimeout* pMsg);
void syncTimeout2RpcMsg(const SyncTimeout* pMsg, SRpcMsg* pRpcMsg);
void syncTimeoutFromRpcMsg(const SRpcMsg* pRpcMsg, SyncTimeout* pMsg);
cJSON* syncTimeout2Json(const SyncTimeout* pMsg);
SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, void* data);
// ---------------------------------------------
typedef struct SyncPing {
uint32_t bytes;
......
......@@ -26,6 +26,8 @@ extern "C" {
#include "syncInt.h"
#include "taosdef.h"
void syncNodeAppendEntriesPeers(SSyncNode* pSyncNode);
#ifdef __cplusplus
}
#endif
......
......@@ -39,8 +39,9 @@ void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet);
void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId);
bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2);
// ---- SSyncBuffer ----
#if 0
void syncUtilbufBuild(SSyncBuffer* syncBuf, size_t len);
void syncUtilbufDestroy(SSyncBuffer* syncBuf);
......@@ -48,7 +49,6 @@ void syncUtilbufDestroy(SSyncBuffer* syncBuf);
void syncUtilbufCopy(const SSyncBuffer* src, SSyncBuffer* dest);
void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest);
#endif
#ifdef __cplusplus
}
......
......@@ -24,13 +24,36 @@ extern "C" {
#include <stdio.h>
#include <stdlib.h>
#include "syncInt.h"
#include "syncMessage.h"
#include "syncUtil.h"
#include "taosdef.h"
typedef struct SVotesGranted {
SyncTerm term;
int32_t quorum;
int32_t votes;
bool toLeader;
SSyncNode *pSyncNode;
} SVotesGranted;
typedef struct SVotesResponded {
} SVotesResponded;
SVotesGranted *voteGrantedCreate(SSyncNode *pSyncNode);
void voteGrantedDestroy(SVotesGranted *pVotesGranted);
bool voteGrantedMajority(SVotesGranted *pVotesGranted);
void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg);
void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term);
typedef struct SVotesRespond {
SRaftId (*replicas)[TSDB_MAX_REPLICA];
bool isRespond[TSDB_MAX_REPLICA];
int32_t replicaNum;
SyncTerm term;
SSyncNode *pSyncNode;
} SVotesRespond;
SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode);
bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId);
void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg);
void Reset(SVotesRespond *pVotesRespond, SyncTerm term);
#ifdef __cplusplus
}
......
......@@ -14,3 +14,7 @@
*/
#include "syncElection.h"
void syncNodeElect(SSyncNode* pSyncNode) {}
void syncNodeRequestVotePeers(SSyncNode* pSyncNode) {}
\ No newline at end of file
......@@ -40,17 +40,6 @@ static void syncIOTickPingFunc(void *param, void *tmrId);
// ----------------------------
// public function ------------
int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) {
sTrace(
"<--- syncIOSendMsg ---> clientRpc:%p, numOfEps:%d, inUse:%d, destAddr:%s-%u, pMsg->ahandle:%p, pMsg->handle:%p, "
"pMsg->msgType:%d, pMsg->contLen:%d",
clientRpc, pEpSet->numOfEps, pEpSet->inUse, pEpSet->eps[0].fqdn, pEpSet->eps[0].port, pMsg->ahandle, pMsg->handle,
pMsg->msgType, pMsg->contLen);
pMsg->handle = NULL;
rpcSendRequest(clientRpc, pEpSet, pMsg, NULL);
return 0;
}
int32_t syncIOStart(char *host, uint16_t port) {
gSyncIO = syncIOCreate(host, port);
assert(gSyncIO != NULL);
......@@ -83,6 +72,35 @@ int32_t syncIOTickPing() {
return ret;
}
int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) {
sTrace(
"<--- syncIOSendMsg ---> clientRpc:%p, numOfEps:%d, inUse:%d, destAddr:%s-%u, pMsg->ahandle:%p, pMsg->handle:%p, "
"pMsg->msgType:%d, pMsg->contLen:%d",
clientRpc, pEpSet->numOfEps, pEpSet->inUse, pEpSet->eps[0].fqdn, pEpSet->eps[0].port, pMsg->ahandle, pMsg->handle,
pMsg->msgType, pMsg->contLen);
{
cJSON *pJson = syncRpcMsg2Json(pMsg);
char *serialized = cJSON_Print(pJson);
sTrace("process syncMessage send: pMsg:%s ", serialized);
free(serialized);
cJSON_Delete(pJson);
}
pMsg->handle = NULL;
rpcSendRequest(clientRpc, pEpSet, pMsg, NULL);
return 0;
}
int32_t syncIOEqMsg(void *queue, SRpcMsg *pMsg) {
SRpcMsg *pTemp;
pTemp = taosAllocateQitem(sizeof(SRpcMsg));
memcpy(pTemp, pMsg, sizeof(SRpcMsg));
STaosQueue *pMsgQ = queue;
taosWriteQitem(pMsgQ, pTemp);
return 0;
}
// local function ------------
static int32_t syncIOStartInternal(SSyncIO *io) {
taosBlockSIGPIPE();
......@@ -193,7 +211,7 @@ static void *syncIOConsumerFunc(void *param) {
SSyncIO *io = param;
STaosQall *qall;
SRpcMsg * pRpcMsg, rpcMsg;
SRpcMsg *pRpcMsg, rpcMsg;
int type;
qall = taosAllocateQall();
......@@ -215,6 +233,7 @@ static void *syncIOConsumerFunc(void *param) {
syncPingFromRpcMsg(pRpcMsg, pSyncMsg);
// memcpy(pSyncMsg, tmpRpcMsg.pCont, tmpRpcMsg.contLen);
io->FpOnSyncPing(io->pSyncNode, pSyncMsg);
syncPingDestroy(pSyncMsg);
}
} else if (pRpcMsg->msgType == SYNC_PING_REPLY) {
......@@ -223,6 +242,16 @@ static void *syncIOConsumerFunc(void *param) {
pSyncMsg = syncPingReplyBuild(pRpcMsg->contLen);
syncPingReplyFromRpcMsg(pRpcMsg, pSyncMsg);
io->FpOnSyncPingReply(io->pSyncNode, pSyncMsg);
syncPingReplyDestroy(pSyncMsg);
}
} else if (pRpcMsg->msgType == SYNC_TIMEOUT) {
if (io->FpOnSyncTimeout != NULL) {
SyncTimeout *pSyncMsg;
pSyncMsg = syncTimeoutBuild();
syncTimeoutFromRpcMsg(pRpcMsg, pSyncMsg);
io->FpOnSyncTimeout(io->pSyncNode, pSyncMsg);
syncTimeoutDestroy(pSyncMsg);
}
} else {
;
......
......@@ -25,7 +25,10 @@ static int32_t tsNodeRefId = -1;
// ------ local funciton ---------
static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg);
static int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg);
static void syncNodePingTimerCb(void* param, void* tmrId);
static void syncNodeEqPingTimer(void* param, void* tmrId);
static void syncNodeEqElectTimer(void* param, void* tmrId);
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId);
static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg);
static int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg);
......@@ -37,6 +40,7 @@ static int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg);
static int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg);
static int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg);
static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg);
// ---------------------------------
int32_t syncInit() {
......@@ -56,8 +60,6 @@ void syncStop(int64_t rid) {}
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { return 0; }
// int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak) { return 0; }
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak) { return 0; }
ESyncState syncGetMyRole(int64_t rid) { return TAOS_SYNC_STATE_LEADER; }
......@@ -76,6 +78,8 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
pSyncNode->rpcClient = pSyncInfo->rpcClient;
pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg;
pSyncNode->queue = pSyncInfo->queue;
pSyncNode->FpEqMsg = pSyncInfo->FpEqMsg;
pSyncNode->me = pSyncInfo->syncCfg.nodeInfo[pSyncInfo->syncCfg.myIndex];
pSyncNode->peersNum = pSyncInfo->syncCfg.replicaNum - 1;
......@@ -93,8 +97,8 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
pSyncNode->pPingTimer = NULL;
pSyncNode->pingTimerMS = 1000;
atomic_store_8(&pSyncNode->pingTimerStart, 0);
pSyncNode->FpPingTimer = syncNodePingTimerCb;
atomic_store_8(&pSyncNode->pingTimerEnable, 0);
pSyncNode->FpPingTimer = syncNodeEqPingTimer;
pSyncNode->pingTimerCounter = 0;
pSyncNode->FpOnPing = syncNodeOnPingCb;
......@@ -103,6 +107,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplyCb;
pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntriesCb;
pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReplyCb;
pSyncNode->FpOnTimeout = syncNodeOnTimeoutCb;
return pSyncNode;
}
......@@ -148,22 +153,76 @@ void syncNodePingSelf(SSyncNode* pSyncNode) {
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
if (pSyncNode->pPingTimer == NULL) {
pSyncNode->pPingTimer =
taosTmrStart(pSyncNode->FpPingTimer, pSyncNode->pingTimerCounter, pSyncNode, gSyncEnv->pTimerManager);
taosTmrStart(pSyncNode->FpPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager);
} else {
taosTmrReset(pSyncNode->FpPingTimer, pSyncNode->pingTimerCounter, pSyncNode, gSyncEnv->pTimerManager,
taosTmrReset(pSyncNode->FpPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager,
&pSyncNode->pPingTimer);
}
atomic_store_8(&pSyncNode->pingTimerStart, 1);
atomic_store_8(&pSyncNode->pingTimerEnable, 1);
return 0;
}
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
atomic_store_8(&pSyncNode->pingTimerStart, 0);
pSyncNode->pingTimerCounter = TIMER_MAX_MS;
atomic_store_8(&pSyncNode->pingTimerEnable, 0);
pSyncNode->pingTimerMS = TIMER_MAX_MS;
return 0;
}
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode) {
if (pSyncNode->pElectTimer == NULL) {
pSyncNode->pElectTimer =
taosTmrStart(pSyncNode->FpElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager);
} else {
taosTmrReset(pSyncNode->FpElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager,
&pSyncNode->pElectTimer);
}
atomic_store_8(&pSyncNode->electTimerEnable, 1);
return 0;
}
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
atomic_store_8(&pSyncNode->electTimerEnable, 0);
pSyncNode->electTimerMS = TIMER_MAX_MS;
return 0;
}
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode, int32_t ms) { return 0; }
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
if (pSyncNode->pHeartbeatTimer == NULL) {
pSyncNode->pHeartbeatTimer =
taosTmrStart(pSyncNode->FpHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager);
} else {
taosTmrReset(pSyncNode->FpHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager,
&pSyncNode->pHeartbeatTimer);
}
atomic_store_8(&pSyncNode->heartbeatTimerEnable, 1);
return 0;
}
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
atomic_store_8(&pSyncNode->heartbeatTimerEnable, 0);
pSyncNode->heartbeatTimerMS = TIMER_MAX_MS;
return 0;
}
int32_t syncNodeResetHeartbeatTimer(SSyncNode* pSyncNode, int32_t ms) { return 0; }
void syncNodeBecomeFollower(SSyncNode* pSyncNode) {}
void syncNodeBecomeLeader(SSyncNode* pSyncNode) {}
void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {}
void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {}
void syncNodeLeader2Follower(SSyncNode* pSyncNode) {}
void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {}
// ------ local funciton ---------
static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) {
sTrace("syncNodePing pSyncNode:%p ", pSyncNode);
......@@ -204,7 +263,6 @@ static int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pM
}
static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
sTrace("syncNodeSendMsgById pSyncNode:%p ", pSyncNode);
SEpSet epSet;
syncUtilraftId2EpSet(destRaftId, &epSet);
pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg);
......@@ -225,7 +283,7 @@ static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) {
{
cJSON* pJson = syncPing2Json(pMsg);
char* serialized = cJSON_Print(pJson);
sTrace("syncNodeOnPingCb syncNodePing pMsg:%s ", serialized);
sTrace("process syncMessage recv: syncNodeOnPingCb pMsg:%s ", serialized);
free(serialized);
cJSON_Delete(pJson);
}
......@@ -245,7 +303,7 @@ static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) {
{
cJSON* pJson = syncPingReply2Json(pMsg);
char* serialized = cJSON_Print(pJson);
sTrace("syncNodeOnPingReplyCb syncNodePing pMsg:%s ", serialized);
sTrace("process syncMessage recv: syncNodeOnPingReplyCb pMsg:%s ", serialized);
free(serialized);
cJSON_Delete(pJson);
}
......@@ -273,22 +331,50 @@ static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesR
return ret;
}
static void syncNodePingTimerCb(void* param, void* tmrId) {
static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) {
int32_t ret = 0;
sTrace("<-- syncNodeOnTimeoutCb -->");
{
cJSON* pJson = syncTimeout2Json(pMsg);
char* serialized = cJSON_Print(pJson);
sTrace("process syncMessage recv: syncNodeOnTimeoutCb pMsg:%s ", serialized);
free(serialized);
cJSON_Delete(pJson);
}
if (pMsg->timeoutType == SYNC_TIMEOUT_PING) {
if (atomic_load_8(&ths->pingTimerEnable)) {
++(ths->pingTimerCounter);
syncNodePingAll(ths);
}
} else if (pMsg->timeoutType == SYNC_TIMEOUT_ELECTION) {
} else if (pMsg->timeoutType == SYNC_TIMEOUT_HEARTBEAT) {
} else {
}
return ret;
}
static void syncNodeEqPingTimer(void* param, void* tmrId) {
SSyncNode* pSyncNode = (SSyncNode*)param;
if (atomic_load_8(&pSyncNode->pingTimerStart)) {
++(pSyncNode->pingTimerCounter);
if (atomic_load_8(&pSyncNode->pingTimerEnable)) {
// pSyncNode->pingTimerMS += 100;
sTrace(
"syncNodePingTimerCb: pSyncNode->pingTimerCounter:%lu, pSyncNode->pingTimerMS:%d, pSyncNode->pPingTimer:%p, "
"tmrId:%p ",
pSyncNode->pingTimerCounter, pSyncNode->pingTimerMS, pSyncNode->pPingTimer, tmrId);
syncNodePingAll(pSyncNode);
SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_PING, pSyncNode);
SRpcMsg rpcMsg;
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg);
syncTimeoutDestroy(pSyncMsg);
taosTmrReset(syncNodePingTimerCb, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager,
taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager,
&pSyncNode->pPingTimer);
} else {
sTrace("syncNodePingTimerCb: pingTimerStart:%u ", pSyncNode->pingTimerStart);
sTrace("syncNodeEqPingTimer: pingTimerEnable:%u ", pSyncNode->pingTimerEnable);
}
}
\ No newline at end of file
}
static void syncNodeEqElectTimer(void* param, void* tmrId) {}
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {}
\ No newline at end of file
......@@ -20,6 +20,124 @@
void onMessage(SRaft* pRaft, void* pMsg) {}
// ---------------------------------------------
cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) {
cJSON* pRoot;
// in compiler optimization, switch case = if else constants
if (pRpcMsg->msgType == SYNC_TIMEOUT) {
SyncTimeout* pSyncMsg = (SyncTimeout*)pRpcMsg->pCont;
pRoot = syncTimeout2Json(pSyncMsg);
} else if (pRpcMsg->msgType == SYNC_PING) {
SyncPing* pSyncMsg = (SyncPing*)pRpcMsg->pCont;
pRoot = syncPing2Json(pSyncMsg);
} else if (pRpcMsg->msgType == SYNC_PING_REPLY) {
SyncPingReply* pSyncMsg = (SyncPingReply*)pRpcMsg->pCont;
pRoot = syncPingReply2Json(pSyncMsg);
} else if (pRpcMsg->msgType == SYNC_CLIENT_REQUEST) {
pRoot = syncRpcUnknownMsg2Json();
} else if (pRpcMsg->msgType == SYNC_CLIENT_REQUEST_REPLY) {
pRoot = syncRpcUnknownMsg2Json();
} else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE) {
SyncRequestVote* pSyncMsg = (SyncRequestVote*)pRpcMsg->pCont;
pRoot = syncRequestVote2Json(pSyncMsg);
} else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE_REPLY) {
SyncRequestVoteReply* pSyncMsg = (SyncRequestVoteReply*)pRpcMsg->pCont;
pRoot = syncRequestVoteReply2Json(pSyncMsg);
} else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES) {
SyncAppendEntries* pSyncMsg = (SyncAppendEntries*)pRpcMsg->pCont;
pRoot = syncAppendEntries2Json(pSyncMsg);
} else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES_REPLY) {
SyncAppendEntriesReply* pSyncMsg = (SyncAppendEntriesReply*)pRpcMsg->pCont;
pRoot = syncAppendEntriesReply2Json(pSyncMsg);
} else {
pRoot = syncRpcUnknownMsg2Json();
}
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "RpcMsg", pRoot);
return pJson;
}
cJSON* syncRpcUnknownMsg2Json() {
cJSON* pRoot = cJSON_CreateObject();
cJSON_AddNumberToObject(pRoot, "msgType", SYNC_UNKNOWN);
cJSON_AddStringToObject(pRoot, "data", "known message");
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SyncPing", pRoot);
return pJson;
}
// ---- message process SyncTimeout----
SyncTimeout* syncTimeoutBuild() {
uint32_t bytes = sizeof(SyncTimeout);
SyncTimeout* pMsg = malloc(bytes);
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->msgType = SYNC_TIMEOUT;
return pMsg;
}
void syncTimeoutDestroy(SyncTimeout* pMsg) {
if (pMsg != NULL) {
free(pMsg);
}
}
void syncTimeoutSerialize(const SyncTimeout* pMsg, char* buf, uint32_t bufLen) {
assert(pMsg->bytes <= bufLen);
memcpy(buf, pMsg, pMsg->bytes);
}
void syncTimeoutDeserialize(const char* buf, uint32_t len, SyncTimeout* pMsg) {
memcpy(pMsg, buf, len);
assert(len == pMsg->bytes);
}
void syncTimeout2RpcMsg(const SyncTimeout* pMsg, SRpcMsg* pRpcMsg) {
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
pRpcMsg->msgType = pMsg->msgType;
pRpcMsg->contLen = pMsg->bytes;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
syncTimeoutSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
}
void syncTimeoutFromRpcMsg(const SRpcMsg* pRpcMsg, SyncTimeout* pMsg) {
syncTimeoutDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
}
cJSON* syncTimeout2Json(const SyncTimeout* pMsg) {
char u64buf[128];
cJSON* pRoot = cJSON_CreateObject();
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
cJSON_AddNumberToObject(pRoot, "timeoutType", pMsg->timeoutType);
snprintf(u64buf, sizeof(u64buf), "%p", pMsg->data);
cJSON_AddStringToObject(pRoot, "data", u64buf);
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SyncTimeout", pRoot);
return pJson;
}
SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, void* data) {
SyncTimeout* pMsg = syncTimeoutBuild();
pMsg->timeoutType = timeoutType;
pMsg->data = data;
return pMsg;
}
// ---- message process SyncPing----
SyncPing* syncPingBuild(uint32_t dataLen) {
uint32_t bytes = SYNC_PING_FIX_LEN + dataLen;
......
......@@ -14,3 +14,5 @@
*/
#include "syncReplication.h"
void syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {}
\ No newline at end of file
......@@ -68,8 +68,12 @@ void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaft
raftId->vgId = vgId;
}
bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2) {
bool ret = pId1->addr == pId2->addr && pId1->vgId == pId2->vgId;
return ret;
}
// ---- SSyncBuffer -----
#if 0
void syncUtilbufBuild(SSyncBuffer* syncBuf, size_t len) {
syncBuf->len = len;
syncBuf->data = malloc(syncBuf->len);
......@@ -87,4 +91,3 @@ void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest) {
dest->data = malloc(dest->len);
memcpy(dest->data, src->data, dest->len);
}
#endif
\ No newline at end of file
......@@ -14,3 +14,83 @@
*/
#include "syncVoteMgr.h"
SVotesGranted *voteGrantedCreate(SSyncNode *pSyncNode) {
SVotesGranted *pVotesGranted = malloc(sizeof(SVotesGranted));
assert(pVotesGranted != NULL);
memset(pVotesGranted, 0, sizeof(SVotesGranted));
pVotesGranted->quorum = pSyncNode->quorum;
pVotesGranted->term = 0;
pVotesGranted->votes = 0;
pVotesGranted->toLeader = false;
pVotesGranted->pSyncNode = pSyncNode;
return pVotesGranted;
}
void voteGrantedDestroy(SVotesGranted *pVotesGranted) {
if (pVotesGranted != NULL) {
free(pVotesGranted);
}
}
bool voteGrantedMajority(SVotesGranted *pVotesGranted) {
bool ret = pVotesGranted->votes >= pVotesGranted->quorum;
return ret;
}
void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg) {
assert(pMsg->voteGranted == true);
assert(pMsg->term == pVotesGranted->term);
pVotesGranted->votes++;
}
void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term) {
pVotesGranted->term = term;
pVotesGranted->votes = 0;
pVotesGranted->toLeader = false;
}
SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode) {
SVotesRespond *pVotesRespond = malloc(sizeof(SVotesRespond));
assert(pVotesRespond != NULL);
memset(pVotesRespond, 0, sizeof(SVotesRespond));
pVotesRespond->replicas = &(pSyncNode->replicasId);
pVotesRespond->replicaNum = pSyncNode->replicaNum;
pVotesRespond->term = 0;
pVotesRespond->pSyncNode = pSyncNode;
return pVotesRespond;
}
bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId) {
bool ret = false;
for (int i = 0; i < pVotesRespond->replicaNum; ++i) {
if (syncUtilSameId(&(*pVotesRespond->replicas)[i], pRaftId) && pVotesRespond->isRespond[i]) {
ret = true;
break;
}
}
return ret;
}
void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg) {
assert(pVotesRespond->term == pMsg->term);
for (int i = 0; i < pVotesRespond->replicaNum; ++i) {
if (syncUtilSameId(&(*pVotesRespond->replicas)[i], &pMsg->srcId)) {
assert(pVotesRespond->isRespond[i] == false);
pVotesRespond->isRespond[i] = true;
return;
}
}
assert(0);
}
void Reset(SVotesRespond *pVotesRespond, SyncTerm term) {
pVotesRespond->term = term;
for (int i = 0; i < pVotesRespond->replicaNum; ++i) {
pVotesRespond->isRespond[i] = false;
}
}
\ No newline at end of file
......@@ -8,6 +8,7 @@ add_executable(syncIOSendMsgTest "")
add_executable(syncIOSendMsgClientTest "")
add_executable(syncIOSendMsgServerTest "")
add_executable(syncRaftStoreTest "")
add_executable(syncEnqTest "")
target_sources(syncTest
......@@ -50,6 +51,10 @@ target_sources(syncRaftStoreTest
PRIVATE
"syncRaftStoreTest.cpp"
)
target_sources(syncEnqTest
PRIVATE
"syncEnqTest.cpp"
)
target_include_directories(syncTest
......@@ -102,6 +107,11 @@ target_include_directories(syncRaftStoreTest
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncEnqTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries(syncTest
......@@ -144,6 +154,10 @@ target_link_libraries(syncRaftStoreTest
sync
gtest_main
)
target_link_libraries(syncEnqTest
sync
gtest_main
)
enable_testing()
......
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
uint16_t ports[3] = {7010, 7110, 7210};
SSyncNode* doSync(int myIndex) {
SSyncFSM* pFsm;
SSyncInfo syncInfo;
syncInfo.vgId = 1;
syncInfo.rpcClient = gSyncIO->clientRpc;
syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.queue = gSyncIO->pMsgQ;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_sync_ping");
SSyncCfg* pCfg = &syncInfo.syncCfg;
pCfg->myIndex = myIndex;
pCfg->replicaNum = 3;
pCfg->nodeInfo[0].nodePort = ports[0];
snprintf(pCfg->nodeInfo[0].nodeFqdn, sizeof(pCfg->nodeInfo[0].nodeFqdn), "%s", "127.0.0.1");
// taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
pCfg->nodeInfo[1].nodePort = ports[1];
snprintf(pCfg->nodeInfo[1].nodeFqdn, sizeof(pCfg->nodeInfo[1].nodeFqdn), "%s", "127.0.0.1");
// taosGetFqdn(pCfg->nodeInfo[1].nodeFqdn);
pCfg->nodeInfo[2].nodePort = ports[2];
snprintf(pCfg->nodeInfo[2].nodeFqdn, sizeof(pCfg->nodeInfo[2].nodeFqdn), "%s", "127.0.0.1");
// taosGetFqdn(pCfg->nodeInfo[2].nodeFqdn);
SSyncNode* pSyncNode = syncNodeOpen(&syncInfo);
assert(pSyncNode != NULL);
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
gSyncIO->pSyncNode = pSyncNode;
return pSyncNode;
}
void timerPingAll(void* param, void* tmrId) {
SSyncNode* pSyncNode = (SSyncNode*)param;
syncNodePingAll(pSyncNode);
}
int main(int argc, char** argv) {
// taosInitLog((char*)"syncPingTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
logTest();
int myIndex = 0;
if (argc >= 2) {
myIndex = atoi(argv[1]);
if (myIndex > 2 || myIndex < 0) {
fprintf(stderr, "myIndex:%d error. should be 0 - 2", myIndex);
return 1;
}
}
int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]);
assert(ret == 0);
ret = syncEnvStart();
assert(ret == 0);
SSyncNode* pSyncNode = doSync(myIndex);
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
for (int i = 0; i < 10; ++i) {
SyncPingReply* pSyncMsg = syncPingReplyBuild3(&pSyncNode->raftId, &pSyncNode->raftId);
SRpcMsg rpcMsg;
syncPingReply2RpcMsg(pSyncMsg, &rpcMsg);
pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg);
taosMsleep(1000);
}
while (1) {
taosMsleep(1000);
}
return 0;
}
......@@ -22,6 +22,8 @@ SSyncNode* doSync(int myIndex) {
syncInfo.vgId = 1;
syncInfo.rpcClient = gSyncIO->clientRpc;
syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.queue = gSyncIO->pMsgQ;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_sync_ping");
......@@ -76,15 +78,14 @@ int main(int argc, char** argv) {
SSyncNode* pSyncNode = doSync(myIndex);
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
ret = syncNodeStartPingTimer(pSyncNode);
assert(ret == 0);
/*
taosMsleep(10000);
ret = syncNodeStopPingTimer(pSyncNode);
assert(ret == 0);
*/
taosMsleep(10000);
ret = syncNodeStopPingTimer(pSyncNode);
assert(ret == 0);
while (1) {
taosMsleep(1000);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册