提交 be532bd5 编写于 作者: M Minghao Li

sync refactor

上级 94efe101
......@@ -29,6 +29,7 @@ extern "C" {
#include "ttimer.h"
#define TIMER_MAX_MS 0x7FFFFFFF
#define ENV_TICK_TIMER_MS 1000
#define PING_TIMER_MS 1000
#define ELECT_TIMER_MS_MIN 150
#define ELECT_TIMER_MS_MAX 300
......@@ -38,17 +39,28 @@ extern "C" {
#define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0})
typedef struct SSyncEnv {
tmr_h pEnvTickTimer;
// tick timer
tmr_h pEnvTickTimer;
int32_t envTickTimerMS;
uint64_t envTickTimerLogicClock;
uint64_t envTickTimerLogicClockUser;
TAOS_TMR_CALLBACK FpEnvTickTimer; // Timer Fp
uint64_t envTickTimerCounter;
// timer manager
tmr_h pTimerManager;
char name[128];
// other resources shared by SyncNodes
// ...
} SSyncEnv;
extern SSyncEnv* gSyncEnv;
int32_t syncEnvStart();
int32_t syncEnvStop();
tmr_h syncEnvStartTimer(TAOS_TMR_CALLBACK fp, int mseconds, void* param);
void syncEnvStopTimer(tmr_h* pTimer);
int32_t syncEnvStartTimer();
int32_t syncEnvStopTimer();
#ifdef __cplusplus
}
......
......@@ -19,19 +19,18 @@
SSyncEnv *gSyncEnv = NULL;
// local function -----------------
static void syncEnvTick(void *param, void *tmrId);
static int32_t doSyncEnvStart(SSyncEnv *pSyncEnv);
static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv);
static tmr_h doSyncEnvStartTimer(SSyncEnv *pSyncEnv, TAOS_TMR_CALLBACK fp, int mseconds, void *param);
static void doSyncEnvStopTimer(SSyncEnv *pSyncEnv, tmr_h *pTimer);
static SSyncEnv *doSyncEnvStart();
static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv);
static int32_t doSyncEnvStartTimer(SSyncEnv *pSyncEnv);
static int32_t doSyncEnvStopTimer(SSyncEnv *pSyncEnv);
static void syncEnvTick(void *param, void *tmrId);
// --------------------------------
int32_t syncEnvStart() {
int32_t ret;
int32_t ret = 0;
taosSeedRand(taosGetTimestampSec());
gSyncEnv = (SSyncEnv *)malloc(sizeof(SSyncEnv));
gSyncEnv = doSyncEnvStart(gSyncEnv);
assert(gSyncEnv != NULL);
ret = doSyncEnvStart(gSyncEnv);
return ret;
}
......@@ -40,31 +39,46 @@ int32_t syncEnvStop() {
return ret;
}
tmr_h syncEnvStartTimer(TAOS_TMR_CALLBACK fp, int mseconds, void *param) {
return doSyncEnvStartTimer(gSyncEnv, fp, mseconds, param);
int32_t syncEnvStartTimer() {
int32_t ret = doSyncEnvStartTimer(gSyncEnv);
return ret;
}
void syncEnvStopTimer(tmr_h *pTimer) { doSyncEnvStopTimer(gSyncEnv, pTimer); }
int32_t syncEnvStopTimer() {
int32_t ret = doSyncEnvStopTimer(gSyncEnv);
return ret;
}
// local function -----------------
static void syncEnvTick(void *param, void *tmrId) {
SSyncEnv *pSyncEnv = (SSyncEnv *)param;
sTrace("syncEnvTick ... name:%s ", pSyncEnv->name);
pSyncEnv->pEnvTickTimer = taosTmrStart(syncEnvTick, 1000, pSyncEnv, pSyncEnv->pTimerManager);
if (atomic_load_64(&pSyncEnv->envTickTimerLogicClockUser) <= atomic_load_64(&pSyncEnv->envTickTimerLogicClock)) {
++(pSyncEnv->envTickTimerCounter);
sTrace(
"syncEnvTick ... envTickTimerLogicClockUser:%lu, envTickTimerLogicClock:%lu, envTickTimerCounter:%lu, "
"envTickTimerMS:%d",
pSyncEnv->envTickTimerLogicClockUser, pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerCounter,
pSyncEnv->envTickTimerMS);
// do something, tick ...
taosTmrReset(syncEnvTick, pSyncEnv->envTickTimerMS, pSyncEnv, pSyncEnv->pTimerManager, &pSyncEnv->pEnvTickTimer);
}
}
static int32_t doSyncEnvStart(SSyncEnv *pSyncEnv) {
snprintf(pSyncEnv->name, sizeof(pSyncEnv->name), "SyncEnv_%p", pSyncEnv);
static SSyncEnv *doSyncEnvStart() {
SSyncEnv *pSyncEnv = (SSyncEnv *)malloc(sizeof(SSyncEnv));
assert(pSyncEnv != NULL);
memset(pSyncEnv, 0, sizeof(pSyncEnv));
pSyncEnv->envTickTimerCounter = 0;
pSyncEnv->envTickTimerMS = ENV_TICK_TIMER_MS;
pSyncEnv->FpEnvTickTimer = syncEnvTick;
atomic_store_64(&pSyncEnv->envTickTimerLogicClock, 0);
atomic_store_64(&pSyncEnv->envTickTimerLogicClockUser, 0);
// start tmr thread
pSyncEnv->pTimerManager = taosTmrInit(1000, 50, 10000, "SYNC-ENV");
// pSyncEnv->pEnvTickTimer = taosTmrStart(syncEnvTick, 1000, pSyncEnv, pSyncEnv->pTimerManager);
sTrace("SyncEnv start ok, name:%s", pSyncEnv->name);
return 0;
return pSyncEnv;
}
static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv) {
......@@ -72,8 +86,17 @@ static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv) {
return 0;
}
static tmr_h doSyncEnvStartTimer(SSyncEnv *pSyncEnv, TAOS_TMR_CALLBACK fp, int mseconds, void *param) {
return taosTmrStart(fp, mseconds, pSyncEnv, pSyncEnv->pTimerManager);
static int32_t doSyncEnvStartTimer(SSyncEnv *pSyncEnv) {
int32_t ret = 0;
pSyncEnv->pEnvTickTimer =
taosTmrStart(pSyncEnv->FpEnvTickTimer, pSyncEnv->envTickTimerMS, pSyncEnv, pSyncEnv->pTimerManager);
atomic_store_64(&pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerLogicClockUser);
return ret;
}
static void doSyncEnvStopTimer(SSyncEnv *pSyncEnv, tmr_h *pTimer) {}
static int32_t doSyncEnvStopTimer(SSyncEnv *pSyncEnv) {
atomic_add_fetch_64(&pSyncEnv->envTickTimerLogicClockUser, 1);
taosTmrStop(pSyncEnv->pEnvTickTimer);
pSyncEnv->pEnvTickTimer = NULL;
return 0;
}
......@@ -14,15 +14,6 @@ void logTest() {
sFatal("--- sync log test: fatal");
}
void *pTimer = NULL;
void *pTimerMgr = NULL;
int g = 300;
static void timerFp(void *param, void *tmrId) {
printf("param:%p, tmrId:%p, pTimer:%p, pTimerMgr:%p \n", param, tmrId, pTimer, pTimerMgr);
taosTmrReset(timerFp, 1000, param, pTimerMgr, &pTimer);
}
int main() {
// taosInitLog((char*)"syncEnvTest.log", 100000, 10);
tsAsyncLog = 0;
......@@ -34,13 +25,20 @@ int main() {
ret = syncEnvStart();
assert(ret == 0);
// timer
pTimerMgr = taosTmrInit(1000, 50, 10000, "SYNC-ENV-TEST");
taosTmrStart(timerFp, 1000, &g, pTimerMgr);
for (int i = 0; i < 5; ++i) {
ret = syncEnvStartTimer();
assert(ret == 0);
taosMsleep(5000);
while (1) {
taosMsleep(1000);
ret = syncEnvStopTimer();
assert(ret == 0);
taosMsleep(5000);
}
ret = syncEnvStop();
assert(ret == 0);
return 0;
}
......@@ -100,7 +100,7 @@ SyncAppendEntriesReply *createSyncAppendEntriesReply() {
void test1() {
SyncTimeout *pMsg = createSyncTimeout();
SRpcMsg rpcMsg;
SRpcMsg rpcMsg;
syncTimeout2RpcMsg(pMsg, &rpcMsg);
syncRpcMsgPrint2((char *)"test1", &rpcMsg);
syncTimeoutDestroy(pMsg);
......@@ -108,7 +108,7 @@ void test1() {
void test2() {
SyncPing *pMsg = createSyncPing();
SRpcMsg rpcMsg;
SRpcMsg rpcMsg;
syncPing2RpcMsg(pMsg, &rpcMsg);
syncRpcMsgPrint2((char *)"test2", &rpcMsg);
syncPingDestroy(pMsg);
......@@ -116,7 +116,7 @@ void test2() {
void test3() {
SyncPingReply *pMsg = createSyncPingReply();
SRpcMsg rpcMsg;
SRpcMsg rpcMsg;
syncPingReply2RpcMsg(pMsg, &rpcMsg);
syncRpcMsgPrint2((char *)"test3", &rpcMsg);
syncPingReplyDestroy(pMsg);
......@@ -132,7 +132,7 @@ void test4() {
void test5() {
SyncRequestVoteReply *pMsg = createSyncRequestVoteReply();
SRpcMsg rpcMsg;
SRpcMsg rpcMsg;
syncRequestVoteReply2RpcMsg(pMsg, &rpcMsg);
syncRpcMsgPrint2((char *)"test5", &rpcMsg);
syncRequestVoteReplyDestroy(pMsg);
......@@ -140,7 +140,7 @@ void test5() {
void test6() {
SyncAppendEntries *pMsg = createSyncAppendEntries();
SRpcMsg rpcMsg;
SRpcMsg rpcMsg;
syncAppendEntries2RpcMsg(pMsg, &rpcMsg);
syncRpcMsgPrint2((char *)"test6", &rpcMsg);
syncAppendEntriesDestroy(pMsg);
......@@ -148,7 +148,7 @@ void test6() {
void test7() {
SyncAppendEntriesReply *pMsg = createSyncAppendEntriesReply();
SRpcMsg rpcMsg;
SRpcMsg rpcMsg;
syncAppendEntriesReply2RpcMsg(pMsg, &rpcMsg);
syncRpcMsgPrint2((char *)"test7", &rpcMsg);
syncAppendEntriesReplyDestroy(pMsg);
......@@ -156,7 +156,7 @@ void test7() {
void test8() {
SyncClientRequest *pMsg = createSyncClientRequest();
SRpcMsg rpcMsg;
SRpcMsg rpcMsg;
syncClientRequest2RpcMsg(pMsg, &rpcMsg);
syncRpcMsgPrint2((char *)"test8", &rpcMsg);
syncClientRequestDestroy(pMsg);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册