提交 97a71fd8 编写于 作者: M Minghao Li

sync refactor

上级 d95593ac
......@@ -29,6 +29,7 @@ extern "C" {
#include "ttimer.h"
#define TIMER_MAX_MS 0x7FFFFFFF
#define PING_TIMER_MS 1000
typedef struct SSyncEnv {
tmr_h pEnvTickTimer;
......
......@@ -154,9 +154,11 @@ typedef struct SSyncNode {
SyncIndex commitIndex;
// timer
tmr_h pPingTimer;
int32_t pingTimerMS;
uint8_t pingTimerEnable;
tmr_h pPingTimer;
int32_t pingTimerMS;
// uint8_t pingTimerEnable;
uint64_t pingTimerLogicClock;
uint64_t pingTimerLogicClockUser;
TAOS_TMR_CALLBACK FpPingTimer; // Timer Fp
uint64_t pingTimerCounter;
......
......@@ -52,13 +52,13 @@ typedef enum ESyncTimeoutType {
SYNC_TIMEOUT_PING = 100,
SYNC_TIMEOUT_ELECTION,
SYNC_TIMEOUT_HEARTBEAT,
} ESyncTimeoutType;
typedef struct SyncTimeout {
uint32_t bytes;
uint32_t msgType;
ESyncTimeoutType timeoutType;
uint64_t logicClock;
void* data;
} SyncTimeout;
......@@ -69,7 +69,7 @@ void syncTimeoutDeserialize(const char* buf, uint32_t len, SyncTimeout*
void syncTimeout2RpcMsg(const SyncTimeout* pMsg, SRpcMsg* pRpcMsg);
void syncTimeoutFromRpcMsg(const SRpcMsg* pRpcMsg, SyncTimeout* pMsg);
cJSON* syncTimeout2Json(const SyncTimeout* pMsg);
SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, void* data);
SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, uint64_t logicClock, void* data);
// ---------------------------------------------
typedef struct SyncPing {
......
......@@ -80,7 +80,7 @@ int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) {
pMsg->msgType, pMsg->contLen);
{
cJSON *pJson = syncRpcMsg2Json(pMsg);
char *serialized = cJSON_Print(pJson);
char * serialized = cJSON_Print(pJson);
sTrace("process syncMessage send: pMsg:%s ", serialized);
free(serialized);
cJSON_Delete(pJson);
......@@ -211,7 +211,7 @@ static void *syncIOConsumerFunc(void *param) {
SSyncIO *io = param;
STaosQall *qall;
SRpcMsg *pRpcMsg, rpcMsg;
SRpcMsg * pRpcMsg, rpcMsg;
int type;
qall = taosAllocateQall();
......
......@@ -99,8 +99,9 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
syncUtilnodeInfo2raftId(&pSyncNode->me, pSyncNode->vgId, &pSyncNode->raftId);
pSyncNode->pPingTimer = NULL;
pSyncNode->pingTimerMS = 1000;
atomic_store_8(&pSyncNode->pingTimerEnable, 0);
pSyncNode->pingTimerMS = PING_TIMER_MS;
atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
pSyncNode->FpPingTimer = syncNodeEqPingTimer;
pSyncNode->pingTimerCounter = 0;
......@@ -154,6 +155,9 @@ void syncNodePingSelf(SSyncNode* pSyncNode) {
}
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
pSyncNode->pingTimerMS = PING_TIMER_MS;
if (pSyncNode->pPingTimer == NULL) {
pSyncNode->pPingTimer =
taosTmrStart(pSyncNode->FpPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager);
......@@ -162,12 +166,11 @@ int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
&pSyncNode->pPingTimer);
}
atomic_store_8(&pSyncNode->pingTimerEnable, 1);
return 0;
}
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
atomic_store_8(&pSyncNode->pingTimerEnable, 0);
atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
pSyncNode->pingTimerMS = TIMER_MAX_MS;
return 0;
}
......@@ -301,7 +304,7 @@ static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) {
}
if (pMsg->timeoutType == SYNC_TIMEOUT_PING) {
if (atomic_load_8(&ths->pingTimerEnable)) {
if (atomic_load_64(&ths->pingTimerLogicClockUser) <= pMsg->logicClock) {
++(ths->pingTimerCounter);
syncNodePingAll(ths);
}
......@@ -316,11 +319,13 @@ static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) {
static void syncNodeEqPingTimer(void* param, void* tmrId) {
SSyncNode* pSyncNode = (SSyncNode*)param;
if (atomic_load_8(&pSyncNode->pingTimerEnable)) {
if (atomic_load_64(&pSyncNode->pingTimerLogicClockUser) <= atomic_load_64(&pSyncNode->pingTimerLogicClock)) {
// pSyncNode->pingTimerMS += 100;
SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_PING, pSyncNode);
SRpcMsg rpcMsg;
SyncTimeout* pSyncMsg =
syncTimeoutBuild2(SYNC_TIMEOUT_PING, atomic_load_64(&pSyncNode->pingTimerLogicClock), pSyncNode);
SRpcMsg rpcMsg;
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg);
syncTimeoutDestroy(pSyncMsg);
......@@ -328,7 +333,8 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager,
&pSyncNode->pPingTimer);
} else {
sTrace("syncNodeEqPingTimer: pingTimerEnable:%u ", pSyncNode->pingTimerEnable);
sTrace("syncNodeEqPingTimer: pingTimerLogicClock:%lu, pingTimerLogicClockUser:%lu", pSyncNode->pingTimerLogicClock,
pSyncNode->pingTimerLogicClockUser);
}
}
......
......@@ -123,6 +123,8 @@ cJSON* syncTimeout2Json(const SyncTimeout* pMsg) {
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
cJSON_AddNumberToObject(pRoot, "timeoutType", pMsg->timeoutType);
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->logicClock);
cJSON_AddStringToObject(pRoot, "logicClock", u64buf);
snprintf(u64buf, sizeof(u64buf), "%p", pMsg->data);
cJSON_AddStringToObject(pRoot, "data", u64buf);
......@@ -131,9 +133,10 @@ cJSON* syncTimeout2Json(const SyncTimeout* pMsg) {
return pJson;
}
SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, void* data) {
SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, uint64_t logicClock, void* data) {
SyncTimeout* pMsg = syncTimeoutBuild();
pMsg->timeoutType = timeoutType;
pMsg->logicClock = logicClock;
pMsg->data = data;
return pMsg;
}
......
#include <stdio.h>
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
......
#include <stdio.h>
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
......
#include <stdio.h>
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
......
#include <stdio.h>
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
......
#include <stdio.h>
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
......
......@@ -84,6 +84,17 @@ int main(int argc, char** argv) {
assert(ret == 0);
taosMsleep(10000);
ret = syncNodeStopPingTimer(pSyncNode);
assert(ret == 0);
taosMsleep(10000);
ret = syncNodeStartPingTimer(pSyncNode);
assert(ret == 0);
taosMsleep(10000);
ret = syncNodeStopPingTimer(pSyncNode);
assert(ret == 0);
......
#include <stdio.h>
#include <gtest/gtest.h>
#include "syncRaftStore.h"
//#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
......
#include <stdio.h>
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册