提交 c5602fba 编写于 作者: L Li Minghao

add flag in syncEnv, to deal with expired timer

上级 3352f952
...@@ -158,6 +158,8 @@ typedef enum { ...@@ -158,6 +158,8 @@ typedef enum {
int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak); int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak);
bool syncEnvIsStart();
extern int32_t sDebugFlag; extern int32_t sDebugFlag;
//----------------------------------------- //-----------------------------------------
......
...@@ -193,84 +193,89 @@ void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) { ...@@ -193,84 +193,89 @@ void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
// sync integration // sync integration
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
assert(pSyncNode != NULL);
ESyncState state = syncGetMyRole(pVnode->sync); if (syncEnvIsStart()) {
SyncTerm currentTerm = syncGetMyTerm(pVnode->sync);
SMsgHead *pHead = pMsg->pCont; SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
assert(pSyncNode != NULL);
char logBuf[512]; ESyncState state = syncGetMyRole(pVnode->sync);
char *syncNodeStr = sync2SimpleStr(pVnode->sync); SyncTerm currentTerm = syncGetMyTerm(pVnode->sync);
snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
syncRpcMsgLog2(logBuf, pMsg);
taosMemoryFree(syncNodeStr);
SRpcMsg *pRpcMsg = pMsg; SMsgHead *pHead = pMsg->pCont;
if (pRpcMsg->msgType == TDMT_VND_SYNC_TIMEOUT) { char logBuf[512];
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg); char *syncNodeStr = sync2SimpleStr(pVnode->sync);
assert(pSyncMsg != NULL); snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
syncRpcMsgLog2(logBuf, pMsg);
taosMemoryFree(syncNodeStr);
syncNodeOnTimeoutCb(pSyncNode, pSyncMsg); SRpcMsg *pRpcMsg = pMsg;
syncTimeoutDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING) { if (pRpcMsg->msgType == TDMT_VND_SYNC_TIMEOUT) {
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg); SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL); assert(pSyncMsg != NULL);
syncNodeOnPingCb(pSyncNode, pSyncMsg); syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
syncPingDestroy(pSyncMsg); syncTimeoutDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING_REPLY) { } else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING) {
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg); SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL); assert(pSyncMsg != NULL);
syncNodeOnPingReplyCb(pSyncNode, pSyncMsg); syncNodeOnPingCb(pSyncNode, pSyncMsg);
syncPingReplyDestroy(pSyncMsg); syncPingDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) { } else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING_REPLY) {
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg); SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL); assert(pSyncMsg != NULL);
syncNodeOnClientRequestCb(pSyncNode, pSyncMsg); syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
syncClientRequestDestroy(pSyncMsg); syncPingReplyDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) { } else if (pRpcMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) {
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg); SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL); assert(pSyncMsg != NULL);
syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg); syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
syncRequestVoteDestroy(pSyncMsg); syncClientRequestDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) { } else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) {
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg); SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL); assert(pSyncMsg != NULL);
syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg); syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
syncRequestVoteReplyDestroy(pSyncMsg); syncRequestVoteDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) { } else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) {
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg); SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL); assert(pSyncMsg != NULL);
syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg); syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
syncAppendEntriesDestroy(pSyncMsg); syncRequestVoteReplyDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) { } else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) {
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg); SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL); assert(pSyncMsg != NULL);
syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg); syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg); syncAppendEntriesDestroy(pSyncMsg);
} else { } else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) {
vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType); SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
} assert(pSyncMsg != NULL);
syncNodeRelease(pSyncNode); syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg);
} else {
vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType);
}
syncNodeRelease(pSyncNode);
} else {
vError("==vnodeProcessSyncReq== error syncEnv stop");
}
return 0; return 0;
} }
......
...@@ -39,6 +39,8 @@ extern "C" { ...@@ -39,6 +39,8 @@ extern "C" {
#define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0}) #define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0})
typedef struct SSyncEnv { typedef struct SSyncEnv {
uint8_t isStart;
// tick timer // tick timer
tmr_h pEnvTickTimer; tmr_h pEnvTickTimer;
int32_t envTickTimerMS; int32_t envTickTimerMS;
......
...@@ -26,6 +26,14 @@ static int32_t doSyncEnvStopTimer(SSyncEnv *pSyncEnv); ...@@ -26,6 +26,14 @@ static int32_t doSyncEnvStopTimer(SSyncEnv *pSyncEnv);
static void syncEnvTick(void *param, void *tmrId); static void syncEnvTick(void *param, void *tmrId);
// -------------------------------- // --------------------------------
bool syncEnvIsStart() {
if (gSyncEnv == NULL) {
return false;
}
return atomic_load_8(&(gSyncEnv->isStart));
}
int32_t syncEnvStart() { int32_t syncEnvStart() {
int32_t ret = 0; int32_t ret = 0;
taosSeedRand(taosGetTimestampSec()); taosSeedRand(taosGetTimestampSec());
...@@ -88,11 +96,15 @@ static SSyncEnv *doSyncEnvStart() { ...@@ -88,11 +96,15 @@ static SSyncEnv *doSyncEnvStart() {
// start tmr thread // start tmr thread
pSyncEnv->pTimerManager = taosTmrInit(1000, 50, 10000, "SYNC-ENV"); pSyncEnv->pTimerManager = taosTmrInit(1000, 50, 10000, "SYNC-ENV");
atomic_store_8(&(pSyncEnv->isStart), 1);
return pSyncEnv; return pSyncEnv;
} }
static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv) { static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv) {
assert(pSyncEnv == gSyncEnv); assert(pSyncEnv == gSyncEnv);
atomic_store_8(&(pSyncEnv->isStart), 0);
if (pSyncEnv != NULL) { if (pSyncEnv != NULL) {
taosTmrCleanUp(pSyncEnv->pTimerManager); taosTmrCleanUp(pSyncEnv->pTimerManager);
taosMemoryFree(pSyncEnv); taosMemoryFree(pSyncEnv);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册