From 02f0f85aabb8f19a2d8d595aa991b0a12872c5e1 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 28 Feb 2022 14:10:34 +0800 Subject: [PATCH] sync modify timer --- include/libs/sync/sync.h | 4 +- source/libs/sync/inc/syncEnv.h | 10 ++-- source/libs/sync/inc/syncIO.h | 6 --- source/libs/sync/inc/syncInt.h | 38 ++++++++-------- source/libs/sync/inc/syncRaft.h | 4 ++ source/libs/sync/inc/syncRaftStore.h | 4 +- source/libs/sync/src/syncEnv.c | 47 ++++++++++++++++++- source/libs/sync/src/syncIO.c | 63 +++++++++++++++++--------- source/libs/sync/src/syncMain.c | 47 ++++++++++++++----- source/libs/sync/src/syncRaft.c | 4 ++ source/libs/sync/src/syncRaftStore.c | 16 +++---- source/libs/sync/test/syncEnvTest.cpp | 9 ++-- source/libs/sync/test/syncPingTest.cpp | 15 ++++-- source/libs/sync/test/syncTest.cpp | 4 +- 14 files changed, 187 insertions(+), 84 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index a619e66622..5b387852fa 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -34,7 +34,9 @@ typedef enum { TAOS_SYNC_STATE_FOLLOWER = 0, TAOS_SYNC_STATE_CANDIDATE = 1, TAOS_SYNC_STATE_LEADER = 2, -} ESyncState; +} ESyncRole; + +typedef ESyncRole ESyncState; typedef struct SSyncBuffer { void* data; diff --git a/source/libs/sync/inc/syncEnv.h b/source/libs/sync/inc/syncEnv.h index f1c4327b69..960afe37b0 100644 --- a/source/libs/sync/inc/syncEnv.h +++ b/source/libs/sync/inc/syncEnv.h @@ -26,19 +26,21 @@ extern "C" { #include "syncInt.h" #include "taosdef.h" #include "trpc.h" +#include "ttimer.h" typedef struct SSyncEnv { - void *pTimer; - void *pTimerManager; + tmr_h pEnvTickTimer; + tmr_h pTimerManager; + char name[128]; } SSyncEnv; int32_t syncEnvStart(); int32_t syncEnvStop(); -static int32_t doSyncEnvStart(SSyncEnv *pSyncEnv); +tmr_h syncEnvStartTimer(TAOS_TMR_CALLBACK fp, int mseconds, void *param); -static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv); +void syncEnvStopTimer(tmr_h *pTimer); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncIO.h b/source/libs/sync/inc/syncIO.h index 4b788efd79..49edeb638b 100644 --- a/source/libs/sync/inc/syncIO.h +++ b/source/libs/sync/inc/syncIO.h @@ -61,12 +61,6 @@ int32_t syncIOStop(); int32_t syncIOSendMsg(void *handle, const SEpSet *pEpSet, SRpcMsg *pMsg); SSyncIO *syncIOCreate(); -static int32_t doSyncIOStart(SSyncIO *io); -static int32_t doSyncIOStop(SSyncIO *io); -static int32_t doSyncIOPing(SSyncIO *io); -static int32_t doSyncIOOnMsg(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); -static int32_t doSyncIODestroy(SSyncIO *io); - #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index ad8484662a..a8451b3dc4 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -26,6 +26,7 @@ extern "C" { #include "sync.h" #include "taosdef.h" #include "tlog.h" +#include "ttimer.h" extern int32_t sDebugFlag; @@ -87,15 +88,28 @@ typedef struct SyncAppendEntries SyncAppendEntries; struct SyncAppendEntriesReply; typedef struct SyncAppendEntriesReply SyncAppendEntriesReply; +typedef struct SRaftId { + SyncNodeId addr; + SyncGroupId vgId; +} SRaftId; + typedef struct SSyncNode { - int8_t replica; - int8_t quorum; + int8_t replica; + int8_t quorum; + int32_t refCount; + int64_t rid; SyncGroupId vgId; SSyncCfg syncCfg; char path[TSDB_FILENAME_LEN]; - SRaft* pRaft; + ESyncRole role; + SRaftId raftId; + SSyncFSM* pFsm; + + tmr_h pPingTimer; + tmr_h pElectionTimer; + tmr_h pHeartbeatTimer; int32_t (*FpPing)(struct SSyncNode* ths, const SyncPing* pMsg); @@ -123,23 +137,11 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo); void syncNodeClose(SSyncNode* pSyncNode); -static int32_t doSyncNodePing(struct SSyncNode* ths, const SyncPing* pMsg); - -static int32_t onSyncNodePing(struct SSyncNode* ths, SyncPing* pMsg); - -static int32_t onSyncNodePingReply(struct SSyncNode* ths, SyncPingReply* pMsg); - -static int32_t doSyncNodeRequestVote(struct SSyncNode* ths, const SyncRequestVote* pMsg); - -static int32_t onSyncNodeRequestVote(struct SSyncNode* ths, SyncRequestVote* pMsg); - -static int32_t onSyncNodeRequestVoteReply(struct SSyncNode* ths, SyncRequestVoteReply* pMsg); - -static int32_t doSyncNodeAppendEntries(struct SSyncNode* ths, const SyncAppendEntries* pMsg); +void syncNodePingAll(SSyncNode* pSyncNode); -static int32_t onSyncNodeAppendEntries(struct SSyncNode* ths, SyncAppendEntries* pMsg); +void syncNodePingPeers(SSyncNode* pSyncNode); -static int32_t onSyncNodeAppendEntriesReply(struct SSyncNode* ths, SyncAppendEntriesReply* pMsg); +void syncNodePingSelf(SSyncNode* pSyncNode); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncRaft.h b/source/libs/sync/inc/syncRaft.h index a247a29fc4..bc5cf26a4c 100644 --- a/source/libs/sync/inc/syncRaft.h +++ b/source/libs/sync/inc/syncRaft.h @@ -27,6 +27,8 @@ extern "C" { #include "syncMessage.h" #include "taosdef.h" +#if 0 + typedef struct SRaftId { SyncNodeId addr; SyncGroupId vgId; @@ -82,6 +84,8 @@ int32_t raftPropose(SRaft* pRaft, const SSyncBuffer* pBuf, bool isWeak); static int raftSendMsg(SRaftId destRaftId, const void* pMsg, const SRaft* pRaft); +#endif + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncRaftStore.h b/source/libs/sync/inc/syncRaftStore.h index 0fdbd7a150..c54f1a1c83 100644 --- a/source/libs/sync/inc/syncRaftStore.h +++ b/source/libs/sync/inc/syncRaftStore.h @@ -34,8 +34,8 @@ extern "C" { typedef struct SRaftStore { SyncTerm currentTerm; SRaftId voteFor; - //FileFd fd; - char path[RAFT_STORE_PATH_LEN]; + // FileFd fd; + char path[RAFT_STORE_PATH_LEN]; } SRaftStore; SRaftStore *raftStoreOpen(const char *path); diff --git a/source/libs/sync/src/syncEnv.c b/source/libs/sync/src/syncEnv.c index e71cf55cb1..0447fc246a 100644 --- a/source/libs/sync/src/syncEnv.c +++ b/source/libs/sync/src/syncEnv.c @@ -18,6 +18,14 @@ SSyncEnv *gSyncEnv = NULL; +// local function ----------------- +static void syncEnvTick(void *param, void *tmrId); +static int32_t doSyncEnvStart(SSyncEnv *pSyncEnv); +static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv); +static tmr_h doSyncEnvStartTimer(SSyncEnv *pSyncEnv, TAOS_TMR_CALLBACK fp, int mseconds, void *param); +static void doSyncEnvStopTimer(SSyncEnv *pSyncEnv, tmr_h *pTimer); +// -------------------------------- + int32_t syncEnvStart() { int32_t ret; gSyncEnv = (SSyncEnv *)malloc(sizeof(SSyncEnv)); @@ -31,6 +39,41 @@ int32_t syncEnvStop() { return ret; } -static int32_t doSyncEnvStart(SSyncEnv *pSyncEnv) { return 0; } +tmr_h syncEnvStartTimer(TAOS_TMR_CALLBACK fp, int mseconds, void *param) { + return doSyncEnvStartTimer(gSyncEnv, fp, mseconds, param); +} + +void syncEnvStopTimer(tmr_h *pTimer) { doSyncEnvStopTimer(gSyncEnv, pTimer); } + +// local function ----------------- +static void syncEnvTick(void *param, void *tmrId) { + SSyncEnv *pSyncEnv = (SSyncEnv *)param; + sTrace("syncEnvTick ... name:%s ", pSyncEnv->name); + + pSyncEnv->pEnvTickTimer = taosTmrStart(syncEnvTick, 1000, pSyncEnv, pSyncEnv->pTimerManager); +} + +static int32_t doSyncEnvStart(SSyncEnv *pSyncEnv) { + snprintf(pSyncEnv->name, sizeof(pSyncEnv->name), "SyncEnv_%p", pSyncEnv); + + // start tmr thread + pSyncEnv->pTimerManager = taosTmrInit(1000, 50, 10000, "SYNC-ENV"); + + pSyncEnv->pEnvTickTimer = taosTmrStart(syncEnvTick, 1000, pSyncEnv, pSyncEnv->pTimerManager); + + sTrace("SyncEnv start ok, name:%s", pSyncEnv->name); + + return 0; +} + +static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv) { + taosTmrCleanUp(pSyncEnv->pTimerManager); + return 0; +} + +static tmr_h doSyncEnvStartTimer(SSyncEnv *pSyncEnv, TAOS_TMR_CALLBACK fp, int mseconds, void *param) { + return taosTmrStart(fp, mseconds, pSyncEnv, pSyncEnv->pTimerManager); +} -static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv) { return 0; } +static void doSyncEnvStopTimer(SSyncEnv *pSyncEnv, tmr_h *pTimer) {} +// -------------------------------- \ No newline at end of file diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index bb20d11e37..34eaf5fe1e 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -22,12 +22,49 @@ SSyncIO *gSyncIO = NULL; +// local function ------------ +static void *syncConsumer(void *param); +static int retrieveAuthInfo(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey); +static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); +static void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); +static void syncTick(void *param, void *tmrId); + +static int32_t doSyncIOStart(SSyncIO *io); +static int32_t doSyncIOStop(SSyncIO *io); +static int32_t doSyncIOPing(SSyncIO *io); +static int32_t doSyncIOOnMsg(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); +static int32_t doSyncIODestroy(SSyncIO *io); +// ---------------------------- + int32_t syncIOSendMsg(void *handle, const SEpSet *pEpSet, SRpcMsg *pMsg) { return 0; } -int32_t syncIOStart() { return 0; } +int32_t syncIOStart() { + gSyncIO = syncIOCreate(); + assert(gSyncIO != NULL); + + return 0; +} int32_t syncIOStop() { return 0; } +SSyncIO *syncIOCreate() { + SSyncIO *io = (SSyncIO *)malloc(sizeof(SSyncIO)); + memset(io, 0, sizeof(*io)); + + io->pMsgQ = taosOpenQueue(); + io->pQset = taosOpenQset(); + taosAddIntoQset(io->pQset, io->pMsgQ, NULL); + + io->start = doSyncIOStart; + io->stop = doSyncIOStop; + io->ping = doSyncIOPing; + io->onMsg = doSyncIOOnMsg; + io->destroy = doSyncIODestroy; + + return io; +} + +// local function ------------ static void syncTick(void *param, void *tmrId) { SSyncIO *io = (SSyncIO *)param; sDebug("syncTick ... "); @@ -46,14 +83,15 @@ static void syncTick(void *param, void *tmrId) { taosWriteQitem(io->pMsgQ, pTemp); - io->syncTimer = taosTmrStart(syncTick, 1000, io, io->syncTimerManager); + bool b = taosTmrReset(syncTick, 1000, io, io->syncTimerManager, io->syncTimer); + assert(b); } -void *syncConsumer(void *param) { +static void *syncConsumer(void *param) { SSyncIO *io = param; STaosQall *qall; - SRpcMsg * pRpcMsg, rpcMsg; + SRpcMsg *pRpcMsg, rpcMsg; int type; qall = taosAllocateQall(); @@ -114,23 +152,6 @@ static void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { taosWriteQitem(io->pMsgQ, pTemp); } -SSyncIO *syncIOCreate() { - SSyncIO *io = (SSyncIO *)malloc(sizeof(SSyncIO)); - memset(io, 0, sizeof(*io)); - - io->pMsgQ = taosOpenQueue(); - io->pQset = taosOpenQset(); - taosAddIntoQset(io->pQset, io->pMsgQ, NULL); - - io->start = doSyncIOStart; - io->stop = doSyncIOStop; - io->ping = doSyncIOPing; - io->onMsg = doSyncIOOnMsg; - io->destroy = doSyncIODestroy; - - return io; -} - static int32_t doSyncIOStart(SSyncIO *io) { taosBlockSIGPIPE(); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index bd2952505e..75b40dc625 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -18,9 +18,26 @@ #include "syncInt.h" #include "syncRaft.h" -int32_t syncInit() { return 0; } +static int32_t tsNodeRefId = -1; + +// ------ local funciton --------- +static int32_t doSyncNodePing(struct SSyncNode* ths, const SyncPing* pMsg); +static int32_t onSyncNodePing(struct SSyncNode* ths, SyncPing* pMsg); +static int32_t onSyncNodePingReply(struct SSyncNode* ths, SyncPingReply* pMsg); +static int32_t doSyncNodeRequestVote(struct SSyncNode* ths, const SyncRequestVote* pMsg); +static int32_t onSyncNodeRequestVote(struct SSyncNode* ths, SyncRequestVote* pMsg); +static int32_t onSyncNodeRequestVoteReply(struct SSyncNode* ths, SyncRequestVoteReply* pMsg); +static int32_t doSyncNodeAppendEntries(struct SSyncNode* ths, const SyncAppendEntries* pMsg); +static int32_t onSyncNodeAppendEntries(struct SSyncNode* ths, SyncAppendEntries* pMsg); +static int32_t onSyncNodeAppendEntriesReply(struct SSyncNode* ths, SyncAppendEntriesReply* pMsg); +// --------------------------------- + +int32_t syncInit() { + sTrace("syncInit ok"); + return 0; +} -void syncCleanUp() {} +void syncCleanUp() { sTrace("syncCleanUp ok"); } int64_t syncStart(const SSyncInfo* pSyncInfo) { SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo); @@ -59,51 +76,57 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { void syncNodeClose(SSyncNode* pSyncNode) { assert(pSyncNode != NULL); - raftClose(pSyncNode->pRaft); free(pSyncNode); } +void syncNodePingAll(SSyncNode* pSyncNode) { sTrace("syncNodePingAll %p ", pSyncNode); } + +void syncNodePingPeers(SSyncNode* pSyncNode) {} + +void syncNodePingSelf(SSyncNode* pSyncNode) {} + +// ------ local funciton --------- static int32_t doSyncNodePing(struct SSyncNode* ths, const SyncPing* pMsg) { - int32_t ret = ths->pRaft->FpPing(ths->pRaft, pMsg); + int32_t ret = 0; return ret; } static int32_t onSyncNodePing(struct SSyncNode* ths, SyncPing* pMsg) { - int32_t ret = ths->pRaft->FpOnPing(ths->pRaft, pMsg); + int32_t ret = 0; return ret; } static int32_t onSyncNodePingReply(struct SSyncNode* ths, SyncPingReply* pMsg) { - int32_t ret = ths->pRaft->FpOnPingReply(ths->pRaft, pMsg); + int32_t ret = 0; return ret; } static int32_t doSyncNodeRequestVote(struct SSyncNode* ths, const SyncRequestVote* pMsg) { - int32_t ret = ths->pRaft->FpRequestVote(ths->pRaft, pMsg); + int32_t ret = 0; return ret; } static int32_t onSyncNodeRequestVote(struct SSyncNode* ths, SyncRequestVote* pMsg) { - int32_t ret = ths->pRaft->FpOnRequestVote(ths->pRaft, pMsg); + int32_t ret = 0; return ret; } static int32_t onSyncNodeRequestVoteReply(struct SSyncNode* ths, SyncRequestVoteReply* pMsg) { - int32_t ret = ths->pRaft->FpOnRequestVoteReply(ths->pRaft, pMsg); + int32_t ret = 0; return ret; } static int32_t doSyncNodeAppendEntries(struct SSyncNode* ths, const SyncAppendEntries* pMsg) { - int32_t ret = ths->pRaft->FpAppendEntries(ths->pRaft, pMsg); + int32_t ret = 0; return ret; } static int32_t onSyncNodeAppendEntries(struct SSyncNode* ths, SyncAppendEntries* pMsg) { - int32_t ret = ths->pRaft->FpOnAppendEntries(ths->pRaft, pMsg); + int32_t ret = 0; return ret; } static int32_t onSyncNodeAppendEntriesReply(struct SSyncNode* ths, SyncAppendEntriesReply* pMsg) { - int32_t ret = ths->pRaft->FpOnAppendEntriesReply(ths->pRaft, pMsg); + int32_t ret = 0; return ret; } \ No newline at end of file diff --git a/source/libs/sync/src/syncRaft.c b/source/libs/sync/src/syncRaft.c index 9f139730d1..b07c6ea797 100644 --- a/source/libs/sync/src/syncRaft.c +++ b/source/libs/sync/src/syncRaft.c @@ -16,6 +16,8 @@ #include "syncRaft.h" #include "sync.h" +#if 0 + SRaft* raftOpen(SRaftId raftId, SSyncFSM* pFsm) { SRaft* pRaft = (SRaft*)malloc(sizeof(SRaft)); assert(pRaft != NULL); @@ -64,3 +66,5 @@ static int32_t onRaftAppendEntriesReply(struct SRaft* ths, RaftAppendEntriesRepl int32_t raftPropose(SRaft* pRaft, const SSyncBuffer* pBuf, bool isWeak) { return 0; } static int raftSendMsg(SRaftId destRaftId, const void* pMsg, const SRaft* pRaft) { return 0; } + +#endif \ No newline at end of file diff --git a/source/libs/sync/src/syncRaftStore.c b/source/libs/sync/src/syncRaftStore.c index 964cc78490..1c88be72e7 100644 --- a/source/libs/sync/src/syncRaftStore.c +++ b/source/libs/sync/src/syncRaftStore.c @@ -18,24 +18,22 @@ // to complie success: FileIO interface is modified -SRaftStore *raftStoreOpen(const char *path) { return NULL;} +SRaftStore *raftStoreOpen(const char *path) { return NULL; } -static int32_t raftStoreInit(SRaftStore *pRaftStore) { return 0;} +static int32_t raftStoreInit(SRaftStore *pRaftStore) { return 0; } -int32_t raftStoreClose(SRaftStore *pRaftStore) { return 0;} +int32_t raftStoreClose(SRaftStore *pRaftStore) { return 0; } -int32_t raftStorePersist(SRaftStore *pRaftStore) { return 0;} +int32_t raftStorePersist(SRaftStore *pRaftStore) { return 0; } -static bool raftStoreFileExist(char *path) { return 0;} +static bool raftStoreFileExist(char *path) { return 0; } -int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) { return 0;} +int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) { return 0; } -int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) { return 0;} +int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) { return 0; } void raftStorePrint(SRaftStore *pRaftStore) {} - - #if 0 SRaftStore *raftStoreOpen(const char *path) { diff --git a/source/libs/sync/test/syncEnvTest.cpp b/source/libs/sync/test/syncEnvTest.cpp index 1d050e7094..1ac4357c5a 100644 --- a/source/libs/sync/test/syncEnvTest.cpp +++ b/source/libs/sync/test/syncEnvTest.cpp @@ -34,19 +34,20 @@ void doSync() { } int main() { - //taosInitLog((char*)"syncEnvTest.log", 100000, 10); + // taosInitLog((char*)"syncEnvTest.log", 100000, 10); tsAsyncLog = 0; sDebugFlag = 143 + 64; + int32_t ret; logTest(); - int32_t ret = syncIOStart(); - assert(ret == 0); + // ret = syncIOStart(); + // assert(ret == 0); ret = syncEnvStart(); assert(ret == 0); - doSync(); + // doSync(); while (1) { taosMsleep(1000); diff --git a/source/libs/sync/test/syncPingTest.cpp b/source/libs/sync/test/syncPingTest.cpp index e62d051946..110c8c472f 100644 --- a/source/libs/sync/test/syncPingTest.cpp +++ b/source/libs/sync/test/syncPingTest.cpp @@ -13,7 +13,7 @@ void logTest() { sFatal("--- sync log test: fatal"); } -void doSync() { +SSyncNode* doSync() { SSyncFSM* pFsm; SSyncInfo syncInfo; @@ -40,10 +40,17 @@ void doSync() { gSyncIO->FpOnPing = pSyncNode->FpOnPing; gSyncIO->pSyncNode = pSyncNode; + + return pSyncNode; +} + +void timerPingAll(void *param, void *tmrId) { + SSyncNode *pSyncNode = (SSyncNode*)param; + syncNodePingAll(pSyncNode); } int main() { - //taosInitLog((char*)"syncPingTest.log", 100000, 10); + // taosInitLog((char*)"syncPingTest.log", 100000, 10); tsAsyncLog = 0; sDebugFlag = 143 + 64; @@ -55,7 +62,9 @@ int main() { ret = syncEnvStart(); assert(ret == 0); - doSync(); + SSyncNode* pSyncNode = doSync(); + + pSyncNode->pPingTimer = syncEnvStartTimer(timerPingAll, 1000, pSyncNode); while (1) { taosMsleep(1000); diff --git a/source/libs/sync/test/syncTest.cpp b/source/libs/sync/test/syncTest.cpp index 0f72fd822f..0843a48bb8 100644 --- a/source/libs/sync/test/syncTest.cpp +++ b/source/libs/sync/test/syncTest.cpp @@ -1,8 +1,8 @@ #include +#include "gtest/gtest.h" #include "syncIO.h" #include "syncInt.h" #include "syncRaftStore.h" -#include "gtest/gtest.h" void *pingFunc(void *param) { SSyncIO *io = (SSyncIO *)param; @@ -15,7 +15,7 @@ void *pingFunc(void *param) { } int main() { - //taosInitLog((char *)"syncTest.log", 100000, 10); + // taosInitLog((char *)"syncTest.log", 100000, 10); tsAsyncLog = 0; sDebugFlag = 143 + 64; -- GitLab