提交 b5902836 编写于 作者: M Minghao Li

sync refactor

上级 f2ac7957
...@@ -30,12 +30,17 @@ extern "C" { ...@@ -30,12 +30,17 @@ extern "C" {
#define TIMER_MAX_MS 0x7FFFFFFF #define TIMER_MAX_MS 0x7FFFFFFF
#define PING_TIMER_MS 1000 #define PING_TIMER_MS 1000
#define ELECT_TIMER_MS_MIN 150
#define ELECT_TIMER_MS_MAX 300
#define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN)
#define HEARTBEAT_TIMER_MS 30
#define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0})
typedef struct SSyncEnv { typedef struct SSyncEnv {
tmr_h pEnvTickTimer; tmr_h pEnvTickTimer;
tmr_h pTimerManager; tmr_h pTimerManager;
char name[128]; char name[128];
} SSyncEnv; } SSyncEnv;
extern SSyncEnv* gSyncEnv; extern SSyncEnv* gSyncEnv;
......
...@@ -154,9 +154,8 @@ typedef struct SSyncNode { ...@@ -154,9 +154,8 @@ typedef struct SSyncNode {
SyncIndex commitIndex; SyncIndex commitIndex;
// timer // timer
tmr_h pPingTimer; tmr_h pPingTimer;
int32_t pingTimerMS; int32_t pingTimerMS;
// uint8_t pingTimerEnable;
uint64_t pingTimerLogicClock; uint64_t pingTimerLogicClock;
uint64_t pingTimerLogicClockUser; uint64_t pingTimerLogicClockUser;
TAOS_TMR_CALLBACK FpPingTimer; // Timer Fp TAOS_TMR_CALLBACK FpPingTimer; // Timer Fp
...@@ -164,13 +163,15 @@ typedef struct SSyncNode { ...@@ -164,13 +163,15 @@ typedef struct SSyncNode {
tmr_h pElectTimer; tmr_h pElectTimer;
int32_t electTimerMS; int32_t electTimerMS;
uint8_t electTimerEnable; uint64_t electTimerLogicClock;
uint64_t electTimerLogicClockUser;
TAOS_TMR_CALLBACK FpElectTimer; // Timer Fp TAOS_TMR_CALLBACK FpElectTimer; // Timer Fp
uint64_t electTimerCounter; uint64_t electTimerCounter;
tmr_h pHeartbeatTimer; tmr_h pHeartbeatTimer;
int32_t heartbeatTimerMS; int32_t heartbeatTimerMS;
uint8_t heartbeatTimerEnable; uint64_t heartbeatTimerLogicClock;
uint64_t heartbeatTimerLogicClockUser;
TAOS_TMR_CALLBACK FpHeartbeatTimer; // Timer Fp TAOS_TMR_CALLBACK FpHeartbeatTimer; // Timer Fp
uint64_t heartbeatTimerCounter; uint64_t heartbeatTimerCounter;
...@@ -194,8 +195,9 @@ void syncNodePingSelf(SSyncNode* pSyncNode); ...@@ -194,8 +195,9 @@ void syncNodePingSelf(SSyncNode* pSyncNode);
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode); int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode); int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode);
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode); int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms);
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode); int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode);
int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms);
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode); int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode); int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode);
......
...@@ -59,6 +59,7 @@ typedef struct SyncTimeout { ...@@ -59,6 +59,7 @@ typedef struct SyncTimeout {
uint32_t msgType; uint32_t msgType;
ESyncTimeoutType timeoutType; ESyncTimeoutType timeoutType;
uint64_t logicClock; uint64_t logicClock;
int32_t timerMS;
void* data; void* data;
} SyncTimeout; } SyncTimeout;
...@@ -69,7 +70,7 @@ void syncTimeoutDeserialize(const char* buf, uint32_t len, SyncTimeout* ...@@ -69,7 +70,7 @@ void syncTimeoutDeserialize(const char* buf, uint32_t len, SyncTimeout*
void syncTimeout2RpcMsg(const SyncTimeout* pMsg, SRpcMsg* pRpcMsg); void syncTimeout2RpcMsg(const SyncTimeout* pMsg, SRpcMsg* pRpcMsg);
void syncTimeoutFromRpcMsg(const SRpcMsg* pRpcMsg, SyncTimeout* pMsg); void syncTimeoutFromRpcMsg(const SRpcMsg* pRpcMsg, SyncTimeout* pMsg);
cJSON* syncTimeout2Json(const SyncTimeout* pMsg); cJSON* syncTimeout2Json(const SyncTimeout* pMsg);
SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, uint64_t logicClock, void* data); SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS, void* data);
// --------------------------------------------- // ---------------------------------------------
typedef struct SyncPing { typedef struct SyncPing {
......
...@@ -28,28 +28,23 @@ extern "C" { ...@@ -28,28 +28,23 @@ extern "C" {
#include "taosdef.h" #include "taosdef.h"
// ---- encode / decode // ---- encode / decode
uint64_t syncUtilAddr2U64(const char* host, uint16_t port); uint64_t syncUtilAddr2U64(const char* host, uint16_t port);
void syncUtilU642Addr(uint64_t u64, char* host, size_t len, uint16_t* port);
void syncUtilU642Addr(uint64_t u64, char* host, size_t len, uint16_t* port); void syncUtilnodeInfo2EpSet(const SNodeInfo* pNodeInfo, SEpSet* pEpSet);
void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet);
void syncUtilnodeInfo2EpSet(const SNodeInfo* pNodeInfo, SEpSet* pEpSet); void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId);
bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2);
void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet);
void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId);
bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2);
// ---- SSyncBuffer ---- // ---- SSyncBuffer ----
void syncUtilbufBuild(SSyncBuffer* syncBuf, size_t len); void syncUtilbufBuild(SSyncBuffer* syncBuf, size_t len);
void syncUtilbufDestroy(SSyncBuffer* syncBuf); void syncUtilbufDestroy(SSyncBuffer* syncBuf);
void syncUtilbufCopy(const SSyncBuffer* src, SSyncBuffer* dest); void syncUtilbufCopy(const SSyncBuffer* src, SSyncBuffer* dest);
void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest); void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest);
// ---- misc ----
int32_t syncUtilRand(int32_t max);
int32_t syncUtilElectRandomMS();
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -28,6 +28,7 @@ static void doSyncEnvStopTimer(SSyncEnv *pSyncEnv, tmr_h *pTimer); ...@@ -28,6 +28,7 @@ static void doSyncEnvStopTimer(SSyncEnv *pSyncEnv, tmr_h *pTimer);
int32_t syncEnvStart() { int32_t syncEnvStart() {
int32_t ret; int32_t ret;
srand(time(NULL));
gSyncEnv = (SSyncEnv *)malloc(sizeof(SSyncEnv)); gSyncEnv = (SSyncEnv *)malloc(sizeof(SSyncEnv));
assert(gSyncEnv != NULL); assert(gSyncEnv != NULL);
ret = doSyncEnvStart(gSyncEnv); ret = doSyncEnvStart(gSyncEnv);
......
...@@ -44,6 +44,7 @@ int32_t syncIOStart(char *host, uint16_t port) { ...@@ -44,6 +44,7 @@ int32_t syncIOStart(char *host, uint16_t port) {
gSyncIO = syncIOCreate(host, port); gSyncIO = syncIOCreate(host, port);
assert(gSyncIO != NULL); assert(gSyncIO != NULL);
srand(time(NULL));
int32_t ret = syncIOStartInternal(gSyncIO); int32_t ret = syncIOStartInternal(gSyncIO);
assert(ret == 0); assert(ret == 0);
......
...@@ -98,6 +98,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { ...@@ -98,6 +98,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
syncUtilnodeInfo2raftId(&pSyncNode->me, pSyncNode->vgId, &pSyncNode->raftId); syncUtilnodeInfo2raftId(&pSyncNode->me, pSyncNode->vgId, &pSyncNode->raftId);
// init ping timer
pSyncNode->pPingTimer = NULL; pSyncNode->pPingTimer = NULL;
pSyncNode->pingTimerMS = PING_TIMER_MS; pSyncNode->pingTimerMS = PING_TIMER_MS;
atomic_store_64(&pSyncNode->pingTimerLogicClock, 0); atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
...@@ -105,6 +106,22 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { ...@@ -105,6 +106,22 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
pSyncNode->FpPingTimer = syncNodeEqPingTimer; pSyncNode->FpPingTimer = syncNodeEqPingTimer;
pSyncNode->pingTimerCounter = 0; pSyncNode->pingTimerCounter = 0;
// init elect timer
pSyncNode->pElectTimer = NULL;
pSyncNode->electTimerMS = syncUtilElectRandomMS();
atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
atomic_store_64(&pSyncNode->electTimerLogicClockUser, 0);
pSyncNode->FpElectTimer = syncNodeEqElectTimer;
pSyncNode->electTimerCounter = 0;
// init heartbeat timer
pSyncNode->pHeartbeatTimer = NULL;
pSyncNode->heartbeatTimerMS = HEARTBEAT_TIMER_MS;
atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
pSyncNode->FpHeartbeatTimer = syncNodeEqHeartbeatTimer;
pSyncNode->heartbeatTimerCounter = 0;
pSyncNode->FpOnPing = syncNodeOnPingCb; pSyncNode->FpOnPing = syncNodeOnPingCb;
pSyncNode->FpOnPingReply = syncNodeOnPingReplyCb; pSyncNode->FpOnPingReply = syncNodeOnPingReplyCb;
pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteCb; pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteCb;
...@@ -157,7 +174,6 @@ void syncNodePingSelf(SSyncNode* pSyncNode) { ...@@ -157,7 +174,6 @@ void syncNodePingSelf(SSyncNode* pSyncNode) {
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser); atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
pSyncNode->pingTimerMS = PING_TIMER_MS; pSyncNode->pingTimerMS = PING_TIMER_MS;
if (pSyncNode->pPingTimer == NULL) { if (pSyncNode->pPingTimer == NULL) {
pSyncNode->pPingTimer = pSyncNode->pPingTimer =
taosTmrStart(pSyncNode->FpPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager); taosTmrStart(pSyncNode->FpPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager);
...@@ -165,7 +181,6 @@ int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { ...@@ -165,7 +181,6 @@ int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
taosTmrReset(pSyncNode->FpPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager, taosTmrReset(pSyncNode->FpPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager,
&pSyncNode->pPingTimer); &pSyncNode->pPingTimer);
} }
return 0; return 0;
} }
...@@ -175,7 +190,9 @@ int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) { ...@@ -175,7 +190,9 @@ int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
return 0; return 0;
} }
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode) { int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
pSyncNode->electTimerMS = ms;
atomic_store_64(&pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser);
if (pSyncNode->pElectTimer == NULL) { if (pSyncNode->pElectTimer == NULL) {
pSyncNode->pElectTimer = pSyncNode->pElectTimer =
taosTmrStart(pSyncNode->FpElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager); taosTmrStart(pSyncNode->FpElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager);
...@@ -183,18 +200,23 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode) { ...@@ -183,18 +200,23 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode) {
taosTmrReset(pSyncNode->FpElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager, taosTmrReset(pSyncNode->FpElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager,
&pSyncNode->pElectTimer); &pSyncNode->pElectTimer);
} }
atomic_store_8(&pSyncNode->electTimerEnable, 1);
return 0; return 0;
} }
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) { int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
atomic_store_8(&pSyncNode->electTimerEnable, 0); atomic_add_fetch_64(&pSyncNode->electTimerLogicClockUser, 1);
pSyncNode->electTimerMS = TIMER_MAX_MS; pSyncNode->electTimerMS = TIMER_MAX_MS;
return 0; return 0;
} }
int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
syncNodeStopElectTimer(pSyncNode);
syncNodeStartElectTimer(pSyncNode, ms);
return 0;
}
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) { int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
if (pSyncNode->pHeartbeatTimer == NULL) { if (pSyncNode->pHeartbeatTimer == NULL) {
pSyncNode->pHeartbeatTimer = pSyncNode->pHeartbeatTimer =
taosTmrStart(pSyncNode->FpHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager); taosTmrStart(pSyncNode->FpHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager);
...@@ -202,13 +224,11 @@ int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) { ...@@ -202,13 +224,11 @@ int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
taosTmrReset(pSyncNode->FpHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager, taosTmrReset(pSyncNode->FpHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager,
&pSyncNode->pHeartbeatTimer); &pSyncNode->pHeartbeatTimer);
} }
atomic_store_8(&pSyncNode->heartbeatTimerEnable, 1);
return 0; return 0;
} }
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) { int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
atomic_store_8(&pSyncNode->heartbeatTimerEnable, 0); atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1);
pSyncNode->heartbeatTimerMS = TIMER_MAX_MS; pSyncNode->heartbeatTimerMS = TIMER_MAX_MS;
return 0; return 0;
} }
...@@ -320,36 +340,56 @@ static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { ...@@ -320,36 +340,56 @@ static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) {
static void syncNodeEqPingTimer(void* param, void* tmrId) { static void syncNodeEqPingTimer(void* param, void* tmrId) {
SSyncNode* pSyncNode = (SSyncNode*)param; SSyncNode* pSyncNode = (SSyncNode*)param;
if (atomic_load_64(&pSyncNode->pingTimerLogicClockUser) <= atomic_load_64(&pSyncNode->pingTimerLogicClock)) { if (atomic_load_64(&pSyncNode->pingTimerLogicClockUser) <= atomic_load_64(&pSyncNode->pingTimerLogicClock)) {
SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_PING, atomic_load_64(&pSyncNode->pingTimerLogicClock),
pSyncNode->pingTimerMS, pSyncNode);
SRpcMsg rpcMsg;
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg);
syncTimeoutDestroy(pSyncMsg);
// reset timer ms
// pSyncNode->pingTimerMS += 100; // pSyncNode->pingTimerMS += 100;
SyncTimeout* pSyncMsg = taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager,
syncTimeoutBuild2(SYNC_TIMEOUT_PING, atomic_load_64(&pSyncNode->pingTimerLogicClock), pSyncNode); &pSyncNode->pPingTimer);
} else {
sTrace("syncNodeEqPingTimer: pingTimerLogicClock:%lu, pingTimerLogicClockUser:%lu", pSyncNode->pingTimerLogicClock,
pSyncNode->pingTimerLogicClockUser);
}
}
static void syncNodeEqElectTimer(void* param, void* tmrId) {
SSyncNode* pSyncNode = (SSyncNode*)param;
if (atomic_load_64(&pSyncNode->electTimerLogicClockUser) <= atomic_load_64(&pSyncNode->electTimerLogicClock)) {
SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_ELECTION, atomic_load_64(&pSyncNode->electTimerLogicClock),
pSyncNode->electTimerMS, pSyncNode);
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg);
syncTimeoutDestroy(pSyncMsg); syncTimeoutDestroy(pSyncMsg);
// reset timer ms
pSyncNode->electTimerMS = syncUtilElectRandomMS();
taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager, taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager,
&pSyncNode->pPingTimer); &pSyncNode->pPingTimer);
} else { } else {
sTrace("syncNodeEqPingTimer: pingTimerLogicClock:%lu, pingTimerLogicClockUser:%lu", pSyncNode->pingTimerLogicClock, sTrace("syncNodeEqElectTimer: electTimerLogicClock:%lu, electTimerLogicClockUser:%lu",
pSyncNode->pingTimerLogicClockUser); pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser);
} }
} }
static void syncNodeEqElectTimer(void* param, void* tmrId) {}
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {} static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {}
static void syncNodeBecomeFollower(SSyncNode* pSyncNode) { static void syncNodeBecomeFollower(SSyncNode* pSyncNode) {
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
pSyncNode->leaderCache.addr = 0; pSyncNode->leaderCache = EMPTY_RAFT_ID;
pSyncNode->leaderCache.vgId = 0;
} }
syncNodeStopHeartbeatTimer(pSyncNode); syncNodeStopHeartbeatTimer(pSyncNode);
syncNodeStartElectTimer(pSyncNode); int32_t electMS = syncUtilElectRandomMS();
syncNodeStartElectTimer(pSyncNode, electMS);
} }
static void syncNodeBecomeLeader(SSyncNode* pSyncNode) { static void syncNodeBecomeLeader(SSyncNode* pSyncNode) {
......
...@@ -125,6 +125,7 @@ cJSON* syncTimeout2Json(const SyncTimeout* pMsg) { ...@@ -125,6 +125,7 @@ cJSON* syncTimeout2Json(const SyncTimeout* pMsg) {
cJSON_AddNumberToObject(pRoot, "timeoutType", pMsg->timeoutType); cJSON_AddNumberToObject(pRoot, "timeoutType", pMsg->timeoutType);
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->logicClock); snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->logicClock);
cJSON_AddStringToObject(pRoot, "logicClock", u64buf); cJSON_AddStringToObject(pRoot, "logicClock", u64buf);
cJSON_AddNumberToObject(pRoot, "timerMS", pMsg->timerMS);
snprintf(u64buf, sizeof(u64buf), "%p", pMsg->data); snprintf(u64buf, sizeof(u64buf), "%p", pMsg->data);
cJSON_AddStringToObject(pRoot, "data", u64buf); cJSON_AddStringToObject(pRoot, "data", u64buf);
...@@ -133,10 +134,11 @@ cJSON* syncTimeout2Json(const SyncTimeout* pMsg) { ...@@ -133,10 +134,11 @@ cJSON* syncTimeout2Json(const SyncTimeout* pMsg) {
return pJson; return pJson;
} }
SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, uint64_t logicClock, void* data) { SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS, void* data) {
SyncTimeout* pMsg = syncTimeoutBuild(); SyncTimeout* pMsg = syncTimeoutBuild();
pMsg->timeoutType = timeoutType; pMsg->timeoutType = timeoutType;
pMsg->logicClock = logicClock; pMsg->logicClock = logicClock;
pMsg->timerMS = timerMS;
pMsg->data = data; pMsg->data = data;
return pMsg; return pMsg;
} }
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include <arpa/inet.h> #include <arpa/inet.h>
#include <netinet/in.h> #include <netinet/in.h>
#include <sys/socket.h> #include <sys/socket.h>
#include "syncEnv.h"
// ---- encode / decode // ---- encode / decode
uint64_t syncUtilAddr2U64(const char* host, uint16_t port) { uint64_t syncUtilAddr2U64(const char* host, uint16_t port) {
...@@ -91,3 +92,9 @@ void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest) { ...@@ -91,3 +92,9 @@ void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest) {
dest->data = malloc(dest->len); dest->data = malloc(dest->len);
memcpy(dest->data, src->data, dest->len); memcpy(dest->data, src->data, dest->len);
} }
// ---- misc ----
int32_t syncUtilRand(int32_t max) { return rand() % max; }
int32_t syncUtilElectRandomMS() { ELECT_TIMER_MS_MIN + syncUtilRand(ELECT_TIMER_MS_RANGE); }
\ No newline at end of file
...@@ -13,17 +13,21 @@ void print(SHashObj *pNextIndex) { ...@@ -13,17 +13,21 @@ void print(SHashObj *pNextIndex) {
} }
} }
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
int main() { int main() {
// taosInitLog((char *)"syncTest.log", 100000, 10); // taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0; tsAsyncLog = 0;
sDebugFlag = 143 + 64; sDebugFlag = 143 + 64;
sTrace("sync log test: trace"); logTest();
sDebug("sync log test: debug");
sInfo("sync log test: info");
sWarn("sync log test: warn");
sError("sync log test: error");
sFatal("sync log test: fatal");
SRaftId me; SRaftId me;
SRaftId peer1; SRaftId peer1;
......
...@@ -4,14 +4,13 @@ ...@@ -4,14 +4,13 @@
#include "syncIO.h" #include "syncIO.h"
#include "syncInt.h" #include "syncInt.h"
void *pingFunc(void *param) { void logTest() {
SSyncIO *io = (SSyncIO *)param; sTrace("--- sync log test: trace");
while (1) { sDebug("--- sync log test: debug");
sDebug("io->ping"); sInfo("--- sync log test: info");
// io->ping(io); sWarn("--- sync log test: warn");
sleep(1); sError("--- sync log test: error");
} sFatal("--- sync log test: fatal");
return NULL;
} }
int main() { int main() {
...@@ -19,12 +18,7 @@ int main() { ...@@ -19,12 +18,7 @@ int main() {
tsAsyncLog = 0; tsAsyncLog = 0;
sDebugFlag = 143 + 64; sDebugFlag = 143 + 64;
sTrace("sync log test: trace"); logTest();
sDebug("sync log test: debug");
sInfo("sync log test: info");
sWarn("sync log test: warn");
sError("sync log test: error");
sFatal("sync log test: fatal");
SRaftStore *pRaftStore = raftStoreOpen("./raft_store.json"); SRaftStore *pRaftStore = raftStoreOpen("./raft_store.json");
assert(pRaftStore != NULL); assert(pRaftStore != NULL);
......
...@@ -4,55 +4,20 @@ ...@@ -4,55 +4,20 @@
#include "syncInt.h" #include "syncInt.h"
#include "syncRaftStore.h" #include "syncRaftStore.h"
void *pingFunc(void *param) { void logTest() {
SSyncIO *io = (SSyncIO *)param; sTrace("--- sync log test: trace");
while (1) { sDebug("--- sync log test: debug");
sDebug("io->ping"); sInfo("--- sync log test: info");
// io->ping(io); sWarn("--- sync log test: warn");
sleep(1); sError("--- sync log test: error");
} sFatal("--- sync log test: fatal");
return NULL;
} }
int main() { int main() {
// taosInitLog((char *)"syncTest.log", 100000, 10); // taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0; tsAsyncLog = 0;
sDebugFlag = 143 + 64; sDebugFlag = 143 + 64;
logTest();
sTrace("sync log test: trace");
sDebug("sync log test: debug");
sInfo("sync log test: info");
sWarn("sync log test: warn");
sError("sync log test: error");
sFatal("sync log test: fatal");
SRaftStore *pRaftStore = raftStoreOpen("./raft_store.json");
// assert(pRaftStore != NULL);
// raftStorePrint(pRaftStore);
// pRaftStore->currentTerm = 100;
// pRaftStore->voteFor.addr = 200;
// pRaftStore->voteFor.vgId = 300;
// raftStorePrint(pRaftStore);
// raftStorePersist(pRaftStore);
// sDebug("sync test");
// SSyncIO *syncIO = syncIOCreate();
// assert(syncIO != NULL);
// syncIO->start(syncIO);
// sleep(2);
// pthread_t tid;
// pthread_create(&tid, NULL, pingFunc, syncIO);
// while (1) {
// sleep(1);
// }
return 0; return 0;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册