diff --git a/source/libs/sync/inc/syncEnv.h b/source/libs/sync/inc/syncEnv.h index 960afe37b08ca83779480eb6c3dcb28881aa52f6..c4735d448131c0e54dc3944c687081fe49c02b57 100644 --- a/source/libs/sync/inc/syncEnv.h +++ b/source/libs/sync/inc/syncEnv.h @@ -28,12 +28,16 @@ extern "C" { #include "trpc.h" #include "ttimer.h" +#define TIMER_MAX_MS 0x7FFFFFFF + typedef struct SSyncEnv { tmr_h pEnvTickTimer; tmr_h pTimerManager; char name[128]; } SSyncEnv; +extern SSyncEnv *gSyncEnv; + int32_t syncEnvStart(); int32_t syncEnvStop(); diff --git a/source/libs/sync/inc/syncIO.h b/source/libs/sync/inc/syncIO.h index 49edeb638bd646e5d5bed2fb64aaf9aef20b2187..ec887aa85523ddf87cfbc55b6a5fe741c9630235 100644 --- a/source/libs/sync/inc/syncIO.h +++ b/source/libs/sync/inc/syncIO.h @@ -30,10 +30,10 @@ extern "C" { #include "trpc.h" typedef struct SSyncIO { - void * serverRpc; - void * clientRpc; + void *serverRpc; + void *clientRpc; STaosQueue *pMsgQ; - STaosQset * pQset; + STaosQset *pQset; pthread_t tid; int8_t isStart; @@ -56,10 +56,9 @@ typedef struct SSyncIO { extern SSyncIO *gSyncIO; -int32_t syncIOStart(); -int32_t syncIOStop(); -int32_t syncIOSendMsg(void *handle, const SEpSet *pEpSet, SRpcMsg *pMsg); -SSyncIO *syncIOCreate(); +int32_t syncIOStart(); +int32_t syncIOStop(); +int32_t syncIOSendMsg(void *handle, const SEpSet *pEpSet, SRpcMsg *pMsg); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index a8451b3dc460ff8197e4caef3d45ce5570dbff81..e02e2d7374b4a67317e7399d5da4e0cc91c22f0f 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -88,6 +88,9 @@ typedef struct SyncAppendEntries SyncAppendEntries; struct SyncAppendEntriesReply; typedef struct SyncAppendEntriesReply SyncAppendEntriesReply; +struct SSyncEnv; +typedef struct SSyncEnv SSyncEnv; + typedef struct SRaftId { SyncNodeId addr; SyncGroupId vgId; @@ -103,32 +106,46 @@ typedef struct SSyncNode { SSyncCfg syncCfg; char path[TSDB_FILENAME_LEN]; + SNodeInfo me; + SNodeInfo peers[TSDB_MAX_REPLICA]; + int32_t peersNum; + ESyncRole role; SRaftId raftId; SSyncFSM* pFsm; - tmr_h pPingTimer; - tmr_h pElectionTimer; - tmr_h pHeartbeatTimer; - - int32_t (*FpPing)(struct SSyncNode* ths, const SyncPing* pMsg); - + tmr_h pPingTimer; + int32_t pingTimerMS; + uint8_t pingTimerStart; + TAOS_TMR_CALLBACK FpPingTimer; // Timer Fp + uint64_t pingTimerCounter; + + tmr_h pElectTimer; + int32_t electTimerMS; + uint8_t electTimerStart; + TAOS_TMR_CALLBACK FpElectTimer; // Timer Fp + uint64_t electTimerCounter; + + tmr_h pHeartbeatTimer; + int32_t heartbeatTimerMS; + uint8_t heartbeatTimerStart; + TAOS_TMR_CALLBACK FpHeartbeatTimer; // Timer Fp + uint64_t heartbeatTimerCounter; + + // callback int32_t (*FpOnPing)(struct SSyncNode* ths, SyncPing* pMsg); int32_t (*FpOnPingReply)(struct SSyncNode* ths, SyncPingReply* pMsg); - int32_t (*FpRequestVote)(struct SSyncNode* ths, const SyncRequestVote* pMsg); - int32_t (*FpOnRequestVote)(struct SSyncNode* ths, SyncRequestVote* pMsg); int32_t (*FpOnRequestVoteReply)(struct SSyncNode* ths, SyncRequestVoteReply* pMsg); - int32_t (*FpAppendEntries)(struct SSyncNode* ths, const SyncAppendEntries* pMsg); - int32_t (*FpOnAppendEntries)(struct SSyncNode* ths, SyncAppendEntries* pMsg); int32_t (*FpOnAppendEntriesReply)(struct SSyncNode* ths, SyncAppendEntriesReply* pMsg); + // passed from outside int32_t (*FpSendMsg)(void* handle, const SEpSet* pEpSet, SRpcMsg* pMsg); } SSyncNode; @@ -143,6 +160,10 @@ void syncNodePingPeers(SSyncNode* pSyncNode); void syncNodePingSelf(SSyncNode* pSyncNode); +int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode); + +int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/src/syncEnv.c b/source/libs/sync/src/syncEnv.c index 0447fc246aebe467dabeab26a6a49c5a63876afc..bfdf2a9c5eebdd588faa2e5ff9a7875b9c9ad993 100644 --- a/source/libs/sync/src/syncEnv.c +++ b/source/libs/sync/src/syncEnv.c @@ -59,7 +59,7 @@ static int32_t doSyncEnvStart(SSyncEnv *pSyncEnv) { // start tmr thread pSyncEnv->pTimerManager = taosTmrInit(1000, 50, 10000, "SYNC-ENV"); - pSyncEnv->pEnvTickTimer = taosTmrStart(syncEnvTick, 1000, pSyncEnv, pSyncEnv->pTimerManager); + // pSyncEnv->pEnvTickTimer = taosTmrStart(syncEnvTick, 1000, pSyncEnv, pSyncEnv->pTimerManager); sTrace("SyncEnv start ok, name:%s", pSyncEnv->name); diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index 34eaf5fe1e653fcb0555d47da6942d20f76fcf69..5c27f9ec3ac957a6e3460453a1ef72391d6feb9e 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -23,17 +23,18 @@ SSyncIO *gSyncIO = NULL; // local function ------------ -static void *syncConsumer(void *param); -static int retrieveAuthInfo(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey); -static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); -static void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); -static void syncTick(void *param, void *tmrId); - static int32_t doSyncIOStart(SSyncIO *io); static int32_t doSyncIOStop(SSyncIO *io); static int32_t doSyncIOPing(SSyncIO *io); static int32_t doSyncIOOnMsg(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); static int32_t doSyncIODestroy(SSyncIO *io); + +static SSyncIO *syncIOCreate(); +static void * syncIOConsumer(void *param); +static int syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey); +static void syncIODoReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); +static void syncIODoRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); +static void syncIOTick(void *param, void *tmrId); // ---------------------------- int32_t syncIOSendMsg(void *handle, const SEpSet *pEpSet, SRpcMsg *pMsg) { return 0; } @@ -47,27 +48,10 @@ int32_t syncIOStart() { int32_t syncIOStop() { return 0; } -SSyncIO *syncIOCreate() { - SSyncIO *io = (SSyncIO *)malloc(sizeof(SSyncIO)); - memset(io, 0, sizeof(*io)); - - io->pMsgQ = taosOpenQueue(); - io->pQset = taosOpenQset(); - taosAddIntoQset(io->pQset, io->pMsgQ, NULL); - - io->start = doSyncIOStart; - io->stop = doSyncIOStop; - io->ping = doSyncIOPing; - io->onMsg = doSyncIOOnMsg; - io->destroy = doSyncIODestroy; - - return io; -} - // local function ------------ -static void syncTick(void *param, void *tmrId) { +static void syncIOTick(void *param, void *tmrId) { SSyncIO *io = (SSyncIO *)param; - sDebug("syncTick ... "); + sDebug("syncIOTick ... "); SRpcMsg rpcMsg; rpcMsg.pCont = rpcMallocCont(10); @@ -83,15 +67,15 @@ static void syncTick(void *param, void *tmrId) { taosWriteQitem(io->pMsgQ, pTemp); - bool b = taosTmrReset(syncTick, 1000, io, io->syncTimerManager, io->syncTimer); + bool b = taosTmrReset(syncIOTick, 1000, io, io->syncTimerManager, io->syncTimer); assert(b); } -static void *syncConsumer(void *param) { +static void *syncIOConsumer(void *param) { SSyncIO *io = param; STaosQall *qall; - SRpcMsg *pRpcMsg, rpcMsg; + SRpcMsg * pRpcMsg, rpcMsg; int type; qall = taosAllocateQall(); @@ -129,19 +113,19 @@ static void *syncConsumer(void *param) { return NULL; } -static int retrieveAuthInfo(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey) { +static int syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey) { // app shall retrieve the auth info based on meterID from DB or a data file // demo code here only for simple demo int ret = 0; return ret; } -static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { - sDebug("processResponse ... "); +static void syncIODoReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { + sDebug("syncIODoReply ... "); rpcFreeCont(pMsg->pCont); } -static void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { +static void syncIODoRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { SSyncIO *io = pParent; SRpcMsg *pTemp; @@ -152,6 +136,23 @@ static void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { taosWriteQitem(io->pMsgQ, pTemp); } +static SSyncIO *syncIOCreate() { + SSyncIO *io = (SSyncIO *)malloc(sizeof(SSyncIO)); + memset(io, 0, sizeof(*io)); + + io->pMsgQ = taosOpenQueue(); + io->pQset = taosOpenQset(); + taosAddIntoQset(io->pQset, io->pMsgQ, NULL); + + io->start = doSyncIOStart; + io->stop = doSyncIOStop; + io->ping = doSyncIOPing; + io->onMsg = doSyncIOOnMsg; + io->destroy = doSyncIODestroy; + + return io; +} + static int32_t doSyncIOStart(SSyncIO *io) { taosBlockSIGPIPE(); @@ -164,7 +165,7 @@ static int32_t doSyncIOStart(SSyncIO *io) { rpcInit.localPort = 0; rpcInit.label = "SYNC-IO-CLIENT"; rpcInit.numOfThreads = 1; - rpcInit.cfp = processResponse; + rpcInit.cfp = syncIODoReply; rpcInit.sessions = 100; rpcInit.idleTime = 100; rpcInit.user = "sync-io"; @@ -187,10 +188,10 @@ static int32_t doSyncIOStart(SSyncIO *io) { rpcInit.localPort = 38000; rpcInit.label = "SYNC-IO-SERVER"; rpcInit.numOfThreads = 1; - rpcInit.cfp = processRequestMsg; + rpcInit.cfp = syncIODoRequest; rpcInit.sessions = 1000; rpcInit.idleTime = 2 * 1500; - rpcInit.afp = retrieveAuthInfo; + rpcInit.afp = syncIOAuth; rpcInit.parent = io; rpcInit.connType = TAOS_CONN_SERVER; @@ -206,7 +207,7 @@ static int32_t doSyncIOStart(SSyncIO *io) { // start consumer thread { - if (pthread_create(&io->tid, NULL, syncConsumer, io) != 0) { + if (pthread_create(&io->tid, NULL, syncIOConsumer, io) != 0) { sError("failed to create sync consumer thread since %s", strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -215,7 +216,7 @@ static int32_t doSyncIOStart(SSyncIO *io) { // start tmr thread io->syncTimerManager = taosTmrInit(1000, 50, 10000, "SYNC"); - io->syncTimer = taosTmrStart(syncTick, 1000, io, io->syncTimerManager); + io->syncTimer = taosTmrStart(syncIOTick, 1000, io, io->syncTimerManager); return 0; } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 75b40dc625538ba9420f5445974bd108e3803a22..7cfc470f60c7572d2dc607d226a407ac43dbb931 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -15,6 +15,7 @@ #include #include "sync.h" +#include "syncEnv.h" #include "syncInt.h" #include "syncRaft.h" @@ -30,6 +31,7 @@ static int32_t onSyncNodeRequestVoteReply(struct SSyncNode* ths, SyncRequestVote static int32_t doSyncNodeAppendEntries(struct SSyncNode* ths, const SyncAppendEntries* pMsg); static int32_t onSyncNodeAppendEntries(struct SSyncNode* ths, SyncAppendEntries* pMsg); static int32_t onSyncNodeAppendEntriesReply(struct SSyncNode* ths, SyncAppendEntriesReply* pMsg); +static void syncNodePingTimerCb(void* param, void* tmrId); // --------------------------------- int32_t syncInit() { @@ -58,16 +60,19 @@ void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole) {} SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { SSyncNode* pSyncNode = (SSyncNode*)malloc(sizeof(SSyncNode)); assert(pSyncNode != NULL); + memset(pSyncNode, 0, sizeof(SSyncNode)); - pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg; + pSyncNode->pPingTimer = NULL; + pSyncNode->pingTimerMS = 1000; + atomic_store_8(&pSyncNode->pingTimerStart, 0); + pSyncNode->FpPingTimer = syncNodePingTimerCb; + pSyncNode->pingTimerCounter = 0; - pSyncNode->FpPing = doSyncNodePing; + pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg; pSyncNode->FpOnPing = onSyncNodePing; pSyncNode->FpOnPingReply = onSyncNodePingReply; - pSyncNode->FpRequestVote = doSyncNodeRequestVote; pSyncNode->FpOnRequestVote = onSyncNodeRequestVote; pSyncNode->FpOnRequestVoteReply = onSyncNodeRequestVoteReply; - pSyncNode->FpAppendEntries = doSyncNodeAppendEntries; pSyncNode->FpOnAppendEntries = onSyncNodeAppendEntries; pSyncNode->FpOnAppendEntriesReply = onSyncNodeAppendEntriesReply; @@ -85,6 +90,25 @@ void syncNodePingPeers(SSyncNode* pSyncNode) {} void syncNodePingSelf(SSyncNode* pSyncNode) {} +int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { + if (pSyncNode->pPingTimer == NULL) { + pSyncNode->pPingTimer = + taosTmrStart(pSyncNode->FpPingTimer, pSyncNode->pingTimerCounter, pSyncNode, gSyncEnv->pTimerManager); + } else { + taosTmrReset(pSyncNode->FpPingTimer, pSyncNode->pingTimerCounter, pSyncNode, gSyncEnv->pTimerManager, + &pSyncNode->pPingTimer); + } + + atomic_store_8(&pSyncNode->pingTimerStart, 1); + return 0; +} + +int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) { + atomic_store_8(&pSyncNode->pingTimerStart, 0); + pSyncNode->pingTimerCounter = TIMER_MAX_MS; + return 0; +} + // ------ local funciton --------- static int32_t doSyncNodePing(struct SSyncNode* ths, const SyncPing* pMsg) { int32_t ret = 0; @@ -129,4 +153,20 @@ static int32_t onSyncNodeAppendEntries(struct SSyncNode* ths, SyncAppendEntries* static int32_t onSyncNodeAppendEntriesReply(struct SSyncNode* ths, SyncAppendEntriesReply* pMsg) { int32_t ret = 0; return ret; +} + +static void syncNodePingTimerCb(void* param, void* tmrId) { + SSyncNode* pSyncNode = (SSyncNode*)param; + if (atomic_load_8(&pSyncNode->pingTimerStart)) { + ++(pSyncNode->pingTimerCounter); + // pSyncNode->pingTimerMS += 100; + + sTrace("pSyncNode->pingTimerCounter:%lu, pSyncNode->pingTimerMS:%d, pSyncNode->pPingTimer:%p, tmrId:%p ", + pSyncNode->pingTimerCounter, pSyncNode->pingTimerMS, pSyncNode->pPingTimer, tmrId); + + taosTmrReset(syncNodePingTimerCb, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager, + &pSyncNode->pPingTimer); + + syncNodePingSelf(pSyncNode); + } } \ No newline at end of file diff --git a/source/libs/sync/test/syncPingTest.cpp b/source/libs/sync/test/syncPingTest.cpp index 110c8c472ffa3c0ac7afd7713a651d3496ed1b06..bf0d342ca4bb391dc9801aabcace60c07e7ba518 100644 --- a/source/libs/sync/test/syncPingTest.cpp +++ b/source/libs/sync/test/syncPingTest.cpp @@ -44,8 +44,8 @@ SSyncNode* doSync() { return pSyncNode; } -void timerPingAll(void *param, void *tmrId) { - SSyncNode *pSyncNode = (SSyncNode*)param; +void timerPingAll(void* param, void* tmrId) { + SSyncNode* pSyncNode = (SSyncNode*)param; syncNodePingAll(pSyncNode); } @@ -64,7 +64,13 @@ int main() { SSyncNode* pSyncNode = doSync(); - pSyncNode->pPingTimer = syncEnvStartTimer(timerPingAll, 1000, pSyncNode); + ret = syncNodeStartPingTimer(pSyncNode); + assert(ret == 0); + + taosMsleep(5000); + + ret = syncNodeStopPingTimer(pSyncNode); + assert(ret == 0); while (1) { taosMsleep(1000);