diff --git a/source/libs/sync/inc/syncIO.h b/source/libs/sync/inc/syncIO.h index 40fa31d96960540943436a0a11df9f347eb58d2b..a948de8ac16d91d03a1eb1adb95586b94200f861 100644 --- a/source/libs/sync/inc/syncIO.h +++ b/source/libs/sync/inc/syncIO.h @@ -31,11 +31,11 @@ extern "C" { typedef struct SSyncIO { STaosQueue *pMsgQ; - STaosQset * pQset; + STaosQset *pQset; pthread_t consumerTid; - void * serverRpc; - void * clientRpc; + void *serverRpc; + void *clientRpc; SEpSet myAddr; void *ioTimerTickQ; @@ -49,6 +49,7 @@ typedef struct SSyncIO { int32_t (*FpOnSyncRequestVoteReply)(SSyncNode *pSyncNode, SyncRequestVoteReply *pMsg); int32_t (*FpOnSyncAppendEntries)(SSyncNode *pSyncNode, SyncAppendEntries *pMsg); int32_t (*FpOnSyncAppendEntriesReply)(SSyncNode *pSyncNode, SyncAppendEntriesReply *pMsg); + int32_t (*FpOnSyncTimeout)(SSyncNode *pSyncNode, SyncTimeout *pMsg); int8_t isStart; diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 826862450126889c94a75be70666ab3c7665327e..d67b419b2465a8d7c36a05821ecbc11adfef44b2 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -71,6 +71,9 @@ extern int32_t sDebugFlag; struct SRaft; typedef struct SRaft SRaft; +struct SyncTimeout; +typedef struct SyncTimeout SyncTimeout; + struct SyncPing; typedef struct SyncPing SyncPing; @@ -174,6 +177,7 @@ typedef struct SSyncNode { int32_t (*FpOnRequestVoteReply)(SSyncNode* ths, SyncRequestVoteReply* pMsg); int32_t (*FpOnAppendEntries)(SSyncNode* ths, SyncAppendEntries* pMsg); int32_t (*FpOnAppendEntriesReply)(SSyncNode* ths, SyncAppendEntriesReply* pMsg); + int32_t (*FpOnTimeout)(SSyncNode* pSyncNode, SyncTimeout* pMsg); } SSyncNode; diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 5aa27e616b6e71efb87567aa3104a4f2a00d97d5..a2e745b3d9bc132d3bf44419d47e18476cd5ac06 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -39,12 +39,26 @@ typedef enum ESyncMessageType { SYNC_REQUEST_VOTE_REPLY = 111, SYNC_APPEND_ENTRIES = 113, SYNC_APPEND_ENTRIES_REPLY = 115, + SYNC_TIMEOUT = 117, } ESyncMessageType; // --------------------------------------------- cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg); cJSON* syncRpcUnknownMsg2Json(); +// --------------------------------------------- +typedef enum ESyncTimeoutType { + SYNC_TIMEOUT_PING = 0, + SYNC_TIMEOUT_ELECTION, + SYNC_TIMEOUT_HEARTBEAT, + +} ESyncTimeoutType; + +typedef struct SyncTimeout { + ESyncTimeoutType type; + void* data; +} SyncTimeout; + // --------------------------------------------- typedef struct SyncPing { uint32_t bytes; diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index 52cc0cda50df05d1721397eb28efc2c67fd2ae0f..ffd982f2331229b88e960d6c78148899c9948b81 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -80,7 +80,7 @@ int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) { pMsg->msgType, pMsg->contLen); { cJSON *pJson = syncRpcMsg2Json(pMsg); - char * serialized = cJSON_Print(pJson); + char *serialized = cJSON_Print(pJson); sTrace("process syncMessage send: pMsg:%s ", serialized); free(serialized); cJSON_Delete(pJson); @@ -211,7 +211,7 @@ static void *syncIOConsumerFunc(void *param) { SSyncIO *io = param; STaosQall *qall; - SRpcMsg * pRpcMsg, rpcMsg; + SRpcMsg *pRpcMsg, rpcMsg; int type; qall = taosAllocateQall(); @@ -244,7 +244,9 @@ static void *syncIOConsumerFunc(void *param) { io->FpOnSyncPingReply(io->pSyncNode, pSyncMsg); syncPingReplyDestroy(pSyncMsg); } - } else { + + } else if (pRpcMsg->msgType == SYNC_TIMEOUT) { + } else { ; } } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 4183bfbe9aa4bf22280556e4a0d32469756369ab..57bc75473559f22b09476159aab750a3b6d416bb 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -37,6 +37,7 @@ static int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg); static int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg); static int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg); static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); +static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg); // --------------------------------- int32_t syncInit() { @@ -326,6 +327,11 @@ static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesR return ret; } +static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { + int32_t ret = 0; + return ret; +} + static void syncNodePingTimerCb(void* param, void* tmrId) { SSyncNode* pSyncNode = (SSyncNode*)param; if (atomic_load_8(&pSyncNode->pingTimerEnable)) { diff --git a/source/libs/sync/test/syncEnqTest.cpp b/source/libs/sync/test/syncEnqTest.cpp index 57e2da619307fe1cd8ad8a3fec95fc1ead4da3cc..0bf43f933ee85610b6280c88f6120c2e8677e221 100644 --- a/source/libs/sync/test/syncEnqTest.cpp +++ b/source/libs/sync/test/syncEnqTest.cpp @@ -85,7 +85,7 @@ int main(int argc, char** argv) { for (int i = 0; i < 10; ++i) { SyncPingReply* pSyncMsg = syncPingReplyBuild3(&pSyncNode->raftId, &pSyncNode->raftId); - SRpcMsg rpcMsg; + SRpcMsg rpcMsg; syncPingReply2RpcMsg(pSyncMsg, &rpcMsg); pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); taosMsleep(1000);