From 98b35306998cd9e0be9c27be9b782b8d09ec12aa Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Fri, 4 Mar 2022 16:54:25 +0800 Subject: [PATCH] sync timeout --- source/libs/sync/inc/syncIO.h | 7 ++++--- source/libs/sync/inc/syncInt.h | 4 ++++ source/libs/sync/inc/syncMessage.h | 14 ++++++++++++++ source/libs/sync/src/syncIO.c | 8 +++++--- source/libs/sync/src/syncMain.c | 6 ++++++ source/libs/sync/test/syncEnqTest.cpp | 2 +- 6 files changed, 34 insertions(+), 7 deletions(-) diff --git a/source/libs/sync/inc/syncIO.h b/source/libs/sync/inc/syncIO.h index 40fa31d969..a948de8ac1 100644 --- a/source/libs/sync/inc/syncIO.h +++ b/source/libs/sync/inc/syncIO.h @@ -31,11 +31,11 @@ extern "C" { typedef struct SSyncIO { STaosQueue *pMsgQ; - STaosQset * pQset; + STaosQset *pQset; pthread_t consumerTid; - void * serverRpc; - void * clientRpc; + void *serverRpc; + void *clientRpc; SEpSet myAddr; void *ioTimerTickQ; @@ -49,6 +49,7 @@ typedef struct SSyncIO { int32_t (*FpOnSyncRequestVoteReply)(SSyncNode *pSyncNode, SyncRequestVoteReply *pMsg); int32_t (*FpOnSyncAppendEntries)(SSyncNode *pSyncNode, SyncAppendEntries *pMsg); int32_t (*FpOnSyncAppendEntriesReply)(SSyncNode *pSyncNode, SyncAppendEntriesReply *pMsg); + int32_t (*FpOnSyncTimeout)(SSyncNode *pSyncNode, SyncTimeout *pMsg); int8_t isStart; diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 8268624501..d67b419b24 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -71,6 +71,9 @@ extern int32_t sDebugFlag; struct SRaft; typedef struct SRaft SRaft; +struct SyncTimeout; +typedef struct SyncTimeout SyncTimeout; + struct SyncPing; typedef struct SyncPing SyncPing; @@ -174,6 +177,7 @@ typedef struct SSyncNode { int32_t (*FpOnRequestVoteReply)(SSyncNode* ths, SyncRequestVoteReply* pMsg); int32_t (*FpOnAppendEntries)(SSyncNode* ths, SyncAppendEntries* pMsg); int32_t (*FpOnAppendEntriesReply)(SSyncNode* ths, SyncAppendEntriesReply* pMsg); + int32_t (*FpOnTimeout)(SSyncNode* pSyncNode, SyncTimeout* pMsg); } SSyncNode; diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 5aa27e616b..a2e745b3d9 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -39,12 +39,26 @@ typedef enum ESyncMessageType { SYNC_REQUEST_VOTE_REPLY = 111, SYNC_APPEND_ENTRIES = 113, SYNC_APPEND_ENTRIES_REPLY = 115, + SYNC_TIMEOUT = 117, } ESyncMessageType; // --------------------------------------------- cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg); cJSON* syncRpcUnknownMsg2Json(); +// --------------------------------------------- +typedef enum ESyncTimeoutType { + SYNC_TIMEOUT_PING = 0, + SYNC_TIMEOUT_ELECTION, + SYNC_TIMEOUT_HEARTBEAT, + +} ESyncTimeoutType; + +typedef struct SyncTimeout { + ESyncTimeoutType type; + void* data; +} SyncTimeout; + // --------------------------------------------- typedef struct SyncPing { uint32_t bytes; diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index 52cc0cda50..ffd982f233 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -80,7 +80,7 @@ int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) { pMsg->msgType, pMsg->contLen); { cJSON *pJson = syncRpcMsg2Json(pMsg); - char * serialized = cJSON_Print(pJson); + char *serialized = cJSON_Print(pJson); sTrace("process syncMessage send: pMsg:%s ", serialized); free(serialized); cJSON_Delete(pJson); @@ -211,7 +211,7 @@ static void *syncIOConsumerFunc(void *param) { SSyncIO *io = param; STaosQall *qall; - SRpcMsg * pRpcMsg, rpcMsg; + SRpcMsg *pRpcMsg, rpcMsg; int type; qall = taosAllocateQall(); @@ -244,7 +244,9 @@ static void *syncIOConsumerFunc(void *param) { io->FpOnSyncPingReply(io->pSyncNode, pSyncMsg); syncPingReplyDestroy(pSyncMsg); } - } else { + + } else if (pRpcMsg->msgType == SYNC_TIMEOUT) { + } else { ; } } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 4183bfbe9a..57bc754735 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -37,6 +37,7 @@ static int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg); static int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg); static int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg); static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); +static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg); // --------------------------------- int32_t syncInit() { @@ -326,6 +327,11 @@ static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesR return ret; } +static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { + int32_t ret = 0; + return ret; +} + static void syncNodePingTimerCb(void* param, void* tmrId) { SSyncNode* pSyncNode = (SSyncNode*)param; if (atomic_load_8(&pSyncNode->pingTimerEnable)) { diff --git a/source/libs/sync/test/syncEnqTest.cpp b/source/libs/sync/test/syncEnqTest.cpp index 57e2da6193..0bf43f933e 100644 --- a/source/libs/sync/test/syncEnqTest.cpp +++ b/source/libs/sync/test/syncEnqTest.cpp @@ -85,7 +85,7 @@ int main(int argc, char** argv) { for (int i = 0; i < 10; ++i) { SyncPingReply* pSyncMsg = syncPingReplyBuild3(&pSyncNode->raftId, &pSyncNode->raftId); - SRpcMsg rpcMsg; + SRpcMsg rpcMsg; syncPingReply2RpcMsg(pSyncMsg, &rpcMsg); pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); taosMsleep(1000); -- GitLab