From 2da7eac62df253138fed334dff7e5afc669a514a Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 12 Nov 2022 10:08:28 +0800 Subject: [PATCH] refact: adjust timeout msg build --- source/libs/sync/inc/syncMessage.h | 29 +--- source/libs/sync/inc/syncTimeout.h | 2 +- source/libs/sync/src/syncMain.c | 150 ++++++++--------- source/libs/sync/src/syncMessage.c | 158 +++--------------- source/libs/sync/src/syncTimeout.c | 6 +- source/libs/sync/test/syncTimeoutTest.cpp | 2 +- .../sync/test/sync_test_lib/inc/syncTest.h | 18 ++ .../test/sync_test_lib/src/syncMessageDebug.c | 131 +++++++++++++++ 8 files changed, 254 insertions(+), 242 deletions(-) diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 2110c80df3..3656afd7ab 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -40,27 +40,9 @@ typedef struct SyncTimeout { void* data; // need optimized } SyncTimeout; -SyncTimeout* syncTimeoutBuild(); -SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS, int32_t vgId, - void* data); -void syncTimeoutDestroy(SyncTimeout* pMsg); -void syncTimeoutSerialize(const SyncTimeout* pMsg, char* buf, uint32_t bufLen); -void syncTimeoutDeserialize(const char* buf, uint32_t len, SyncTimeout* pMsg); -char* syncTimeoutSerialize2(const SyncTimeout* pMsg, uint32_t* len); -SyncTimeout* syncTimeoutDeserialize2(const char* buf, uint32_t len); -void syncTimeout2RpcMsg(const SyncTimeout* pMsg, SRpcMsg* pRpcMsg); -void syncTimeoutFromRpcMsg(const SRpcMsg* pRpcMsg, SyncTimeout* pMsg); -SyncTimeout* syncTimeoutFromRpcMsg2(const SRpcMsg* pRpcMsg); -cJSON* syncTimeout2Json(const SyncTimeout* pMsg); -char* syncTimeout2Str(const SyncTimeout* pMsg); +int32_t syncTimeoutBuild(SRpcMsg* pTimeoutRpcMsg, ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS, + SSyncNode* pNode); -// for debug ---------------------- -void syncTimeoutPrint(const SyncTimeout* pMsg); -void syncTimeoutPrint2(char* s, const SyncTimeout* pMsg); -void syncTimeoutLog(const SyncTimeout* pMsg); -void syncTimeoutLog2(char* s, const SyncTimeout* pMsg); - -// --------------------------------------------- typedef struct SyncClientRequest { uint32_t bytes; int32_t vgId; @@ -577,9 +559,9 @@ typedef struct SyncLocalCmd { SRaftId srcId; SRaftId destId; - int32_t cmd; - SyncTerm sdNewTerm; // step down new term - SyncIndex fcIndex;// follower commit index + int32_t cmd; + SyncTerm sdNewTerm; // step down new term + SyncIndex fcIndex; // follower commit index } SyncLocalCmd; @@ -628,6 +610,7 @@ bool syncNodeSnapshotEnable(SSyncNode* pSyncNode); ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode); // --------------------------------------------- +SyncTimeout* syncTimeoutBuildX(); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncTimeout.h b/source/libs/sync/inc/syncTimeout.h index 3139707d54..c6b87a1bca 100644 --- a/source/libs/sync/inc/syncTimeout.h +++ b/source/libs/sync/inc/syncTimeout.h @@ -34,7 +34,7 @@ extern "C" { // /\ voterLog' = [voterLog EXCEPT ![i] = [j \in {} |-> <<>>]] // /\ UNCHANGED <> // -int32_t syncNodeOnTimer(SSyncNode* ths, SyncTimeout* pMsg); +int32_t syncNodeOnTimer(SSyncNode* ths, SRpcMsg* pMsg); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index c9e6885e50..1376143a88 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -142,9 +142,7 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) { code = syncNodeOnHeartbeatReply(pSyncNode, pSyncMsg); syncHeartbeatReplyDestroy(pSyncMsg); } else if (pMsg->msgType == TDMT_SYNC_TIMEOUT) { - SyncTimeout* pSyncMsg = syncTimeoutFromRpcMsg2(pMsg); - code = syncNodeOnTimer(pSyncNode, pSyncMsg); - syncTimeoutDestroy(pSyncMsg); + code = syncNodeOnTimer(pSyncNode, pMsg); } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL); } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { @@ -1797,70 +1795,67 @@ int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex } static void syncNodeEqPingTimer(void* param, void* tmrId) { - SSyncNode* pSyncNode = (SSyncNode*)param; - if (atomic_load_64(&pSyncNode->pingTimerLogicClockUser) <= atomic_load_64(&pSyncNode->pingTimerLogicClock)) { - SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_PING, atomic_load_64(&pSyncNode->pingTimerLogicClock), - pSyncNode->pingTimerMS, pSyncNode->vgId, pSyncNode); - SRpcMsg rpcMsg; - syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); - sNTrace(pSyncNode, "enqueue ping timer"); - if (pSyncNode->syncEqMsg != NULL) { - int32_t code = pSyncNode->syncEqMsg(pSyncNode->msgcb, &rpcMsg); - if (code != 0) { - sError("vgId:%d, sync enqueue ping msg error, code:%d", pSyncNode->vgId, code); - rpcFreeCont(rpcMsg.pCont); - syncTimeoutDestroy(pSyncMsg); - return; - } - } else { - sTrace("syncNodeEqPingTimer pSyncNode->syncEqMsg is NULL"); + if (!syncIsInit()) return; + + SSyncNode* pNode = param; + if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) { + SRpcMsg rpcMsg = {0}; + int32_t code = syncTimeoutBuild(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock), + pNode->pingTimerMS, pNode); + if (code != 0) { + sNError(pNode, "failed to build ping msg"); + rpcFreeCont(rpcMsg.pCont); + return; } - syncTimeoutDestroy(pSyncMsg); - if (syncIsInit()) { - taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, syncEnv()->pTimerManager, - &pSyncNode->pPingTimer); - } else { - sError("sync env is stop, syncNodeEqPingTimer"); + sNTrace(pNode, "enqueue ping msg"); + code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg); + if (code != 0) { + sNError(pNode, "failed to sync enqueue ping msg since %s", terrstr()); + rpcFreeCont(rpcMsg.pCont); + return; } + taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, pNode, syncEnv()->pTimerManager, &pNode->pPingTimer); } else { sTrace("==syncNodeEqPingTimer== pingTimerLogicClock:%" PRId64 ", pingTimerLogicClockUser:%" PRId64, - pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser); + pNode->pingTimerLogicClock, pNode->pingTimerLogicClockUser); } } static void syncNodeEqElectTimer(void* param, void* tmrId) { - SElectTimer* pElectTimer = (SElectTimer*)param; - SSyncNode* pSyncNode = pElectTimer->pSyncNode; - - SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_ELECTION, pElectTimer->logicClock, pSyncNode->electTimerMS, - pSyncNode->vgId, pSyncNode); - SRpcMsg rpcMsg; - syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); - if (pSyncNode->syncEqMsg != NULL && pSyncNode->msgcb != NULL && pSyncNode->msgcb->putToQueueFp != NULL) { - int32_t code = pSyncNode->syncEqMsg(pSyncNode->msgcb, &rpcMsg); - if (code != 0) { - sError("vgId:%d, sync enqueue elect msg error, code:%d", pSyncNode->vgId, code); - rpcFreeCont(rpcMsg.pCont); - syncTimeoutDestroy(pSyncMsg); - taosMemoryFree(pElectTimer); - return; - } - sNTrace(pSyncNode, "eq elect timer lc:%" PRId64, pSyncMsg->logicClock); - } else { - sTrace("syncNodeEqElectTimer syncEqMsg is NULL"); + if (!syncIsInit()) return; + + SElectTimer* pElectTimer = param; + SSyncNode* pNode = pElectTimer->pSyncNode; + + SRpcMsg rpcMsg = {0}; + int32_t code = syncTimeoutBuild(&rpcMsg, SYNC_TIMEOUT_ELECTION, pElectTimer->logicClock, pNode->electTimerMS, pNode); + + if (code != 0) { + sNError(pNode, "failed to build elect msg"); + taosMemoryFree(pElectTimer); + return; + } + + SyncTimeout* pTimeout = rpcMsg.pCont; + sNTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock); + + code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg); + if (code != 0) { + sNError(pNode, "failed to sync enqueue elect msg since %s", terrstr()); + rpcFreeCont(rpcMsg.pCont); + taosMemoryFree(pElectTimer); + return; } - syncTimeoutDestroy(pSyncMsg); taosMemoryFree(pElectTimer); #if 0 // reset timer ms - if (syncIsInit() && pSyncNode->electBaseLine > 0) { - pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine); - taosTmrReset(syncNodeEqElectTimer, pSyncNode->electTimerMS, pSyncNode, syncEnv()->pTimerManager, - &pSyncNode->pElectTimer); + if (syncIsInit() && pNode->electBaseLine > 0) { + pNode->electTimerMS = syncUtilElectRandomMS(pNode->electBaseLine, 2 * pNode->electBaseLine); + taosTmrReset(syncNodeEqElectTimer, pNode->electTimerMS, pNode, syncEnv()->pTimerManager, &pNode->pElectTimer); } else { sError("sync env is stop, syncNodeEqElectTimer"); } @@ -1868,41 +1863,34 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { } static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { - SSyncNode* pSyncNode = (SSyncNode*)param; - sNTrace(pSyncNode, "eq hb timer"); + if (!syncIsInit()) return; - if (pSyncNode->replicaNum > 1) { - if (atomic_load_64(&pSyncNode->heartbeatTimerLogicClockUser) <= - atomic_load_64(&pSyncNode->heartbeatTimerLogicClock)) { - SyncTimeout* pSyncMsg = - syncTimeoutBuild2(SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pSyncNode->heartbeatTimerLogicClock), - pSyncNode->heartbeatTimerMS, pSyncNode->vgId, pSyncNode); - SRpcMsg rpcMsg; - syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); - sNTrace(pSyncNode, "enqueue heartbeat timer"); - if (pSyncNode->syncEqMsg != NULL) { - int32_t code = pSyncNode->syncEqMsg(pSyncNode->msgcb, &rpcMsg); - if (code != 0) { - sError("vgId:%d, sync enqueue timer msg error, code:%d", pSyncNode->vgId, code); - rpcFreeCont(rpcMsg.pCont); - syncTimeoutDestroy(pSyncMsg); - return; - } - } else { - sError("vgId:%d, enqueue msg cb ptr (i.e. syncEqMsg) not set.", pSyncNode->vgId); + SSyncNode* pNode = param; + if (pNode->replicaNum > 1) { + if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) { + SRpcMsg rpcMsg = {0}; + int32_t code = syncTimeoutBuild(&rpcMsg, SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pNode->heartbeatTimerLogicClock), + pNode->heartbeatTimerMS, pNode); + + if (code != 0) { + sNError(pNode, "failed to build heartbeat msg"); + return; } - syncTimeoutDestroy(pSyncMsg); - if (syncIsInit()) { - taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, syncEnv()->pTimerManager, - &pSyncNode->pHeartbeatTimer); - } else { - sError("sync env is stop, syncNodeEqHeartbeatTimer"); + sNTrace(pNode, "enqueue heartbeat timer"); + code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg); + if (code != 0) { + sNError(pNode, "failed to enqueue heartbeat msg since %s", terrstr()); + rpcFreeCont(rpcMsg.pCont); + return; } + + taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, pNode, syncEnv()->pTimerManager, + &pNode->pHeartbeatTimer); + } else { - sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64 - "", - pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser); + sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64, + pNode->heartbeatTimerLogicClock, pNode->heartbeatTimerLogicClockUser); } } } diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 74f4a19f81..55f80b4cc1 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -20,139 +20,28 @@ #include "syncUtil.h" #include "tcoding.h" -// ---- message process SyncTimeout---- -SyncTimeout* syncTimeoutBuild() { - uint32_t bytes = sizeof(SyncTimeout); - SyncTimeout* pMsg = taosMemoryMalloc(bytes); - memset(pMsg, 0, bytes); - pMsg->bytes = bytes; - pMsg->msgType = TDMT_SYNC_TIMEOUT; - return pMsg; -} - -SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS, int32_t vgId, - void* data) { - SyncTimeout* pMsg = syncTimeoutBuild(); - pMsg->vgId = vgId; - pMsg->timeoutType = timeoutType; - pMsg->logicClock = logicClock; - pMsg->timerMS = timerMS; - pMsg->data = data; - return pMsg; -} - -void syncTimeoutDestroy(SyncTimeout* pMsg) { - if (pMsg != NULL) { - taosMemoryFree(pMsg); - } -} - -void syncTimeoutSerialize(const SyncTimeout* pMsg, char* buf, uint32_t bufLen) { - ASSERT(pMsg->bytes <= bufLen); - memcpy(buf, pMsg, pMsg->bytes); -} - -void syncTimeoutDeserialize(const char* buf, uint32_t len, SyncTimeout* pMsg) { - memcpy(pMsg, buf, len); - ASSERT(len == pMsg->bytes); -} - -char* syncTimeoutSerialize2(const SyncTimeout* pMsg, uint32_t* len) { - char* buf = taosMemoryMalloc(pMsg->bytes); - ASSERT(buf != NULL); - syncTimeoutSerialize(pMsg, buf, pMsg->bytes); - if (len != NULL) { - *len = pMsg->bytes; - } - return buf; -} - -SyncTimeout* syncTimeoutDeserialize2(const char* buf, uint32_t len) { - uint32_t bytes = *((uint32_t*)buf); - SyncTimeout* pMsg = taosMemoryMalloc(bytes); - ASSERT(pMsg != NULL); - syncTimeoutDeserialize(buf, len, pMsg); - ASSERT(len == pMsg->bytes); - return pMsg; -} - -void syncTimeout2RpcMsg(const SyncTimeout* pMsg, SRpcMsg* pRpcMsg) { - memset(pRpcMsg, 0, sizeof(*pRpcMsg)); - pRpcMsg->msgType = pMsg->msgType; - pRpcMsg->contLen = pMsg->bytes; - pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); - syncTimeoutSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); -} - -void syncTimeoutFromRpcMsg(const SRpcMsg* pRpcMsg, SyncTimeout* pMsg) { - syncTimeoutDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); -} - -SyncTimeout* syncTimeoutFromRpcMsg2(const SRpcMsg* pRpcMsg) { - SyncTimeout* pMsg = syncTimeoutDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); - ASSERT(pMsg != NULL); - return pMsg; -} - -cJSON* syncTimeout2Json(const SyncTimeout* pMsg) { - char u64buf[128] = {0}; - cJSON* pRoot = cJSON_CreateObject(); - - if (pMsg != NULL) { - cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); - cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); - cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); - cJSON_AddNumberToObject(pRoot, "timeoutType", pMsg->timeoutType); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->logicClock); - cJSON_AddStringToObject(pRoot, "logicClock", u64buf); - cJSON_AddNumberToObject(pRoot, "timerMS", pMsg->timerMS); - snprintf(u64buf, sizeof(u64buf), "%p", pMsg->data); - cJSON_AddStringToObject(pRoot, "data", u64buf); +int32_t syncTimeoutBuild(SRpcMsg* pTimeoutRpcMsg, ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS, + SSyncNode* pNode) { + int32_t bytes = sizeof(SyncTimeout); + pTimeoutRpcMsg->pCont = rpcMallocCont(bytes); + pTimeoutRpcMsg->msgType = TDMT_SYNC_TIMEOUT; + pTimeoutRpcMsg->contLen = bytes; + if (pTimeoutRpcMsg->pCont == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; } - cJSON* pJson = cJSON_CreateObject(); - cJSON_AddItemToObject(pJson, "SyncTimeout", pRoot); - return pJson; -} - -char* syncTimeout2Str(const SyncTimeout* pMsg) { - cJSON* pJson = syncTimeout2Json(pMsg); - char* serialized = cJSON_Print(pJson); - cJSON_Delete(pJson); - return serialized; -} - -// for debug ---------------------- -void syncTimeoutPrint(const SyncTimeout* pMsg) { - char* serialized = syncTimeout2Str(pMsg); - printf("syncTimeoutPrint | len:%zu | %s \n", strlen(serialized), serialized); - fflush(NULL); - taosMemoryFree(serialized); -} - -void syncTimeoutPrint2(char* s, const SyncTimeout* pMsg) { - char* serialized = syncTimeout2Str(pMsg); - printf("syncTimeoutPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized); - fflush(NULL); - taosMemoryFree(serialized); -} - -void syncTimeoutLog(const SyncTimeout* pMsg) { - char* serialized = syncTimeout2Str(pMsg); - sTrace("syncTimeoutLog | len:%d | %s", (int32_t)strlen(serialized), serialized); - taosMemoryFree(serialized); -} - -void syncTimeoutLog2(char* s, const SyncTimeout* pMsg) { - if (gRaftDetailLog) { - char* serialized = syncTimeout2Str(pMsg); - sTrace("syncTimeoutLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); - taosMemoryFree(serialized); - } + SyncTimeout* pTimeout = pTimeoutRpcMsg->pCont; + pTimeout->bytes = bytes; + pTimeout->msgType = TDMT_SYNC_TIMEOUT; + pTimeout->vgId = pNode->vgId; + pTimeout->timeoutType = timeoutType; + pTimeout->logicClock = logicClock; + pTimeout->timerMS = timerMS; + pTimeout->data = pNode; + return 0; } - -// ---- message process SyncClientRequest---- SyncClientRequest* syncClientRequestAlloc(uint32_t dataLen) { uint32_t bytes = sizeof(SyncClientRequest) + dataLen; SyncClientRequest* pMsg = taosMemoryCalloc(1, bytes); @@ -166,6 +55,8 @@ int32_t syncClientRequestBuildFromRpcMsg(SRpcMsg* pClientRequestRpcMsg, const SR bool isWeak, int32_t vgId) { int32_t bytes = sizeof(SyncClientRequest) + pOriginalRpcMsg->contLen; pClientRequestRpcMsg->pCont = rpcMallocCont(bytes); + pClientRequestRpcMsg->msgType = TDMT_SYNC_CLIENT_REQUEST; + pClientRequestRpcMsg->contLen = bytes; if (pClientRequestRpcMsg->pCont == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -181,14 +72,15 @@ int32_t syncClientRequestBuildFromRpcMsg(SRpcMsg* pClientRequestRpcMsg, const SR pClientRequest->dataLen = pOriginalRpcMsg->contLen; memcpy(pClientRequest->data, (char*)pOriginalRpcMsg->pCont, pOriginalRpcMsg->contLen); - pClientRequestRpcMsg->msgType = TDMT_SYNC_CLIENT_REQUEST; - pClientRequestRpcMsg->contLen = bytes; + return 0; } int32_t syncClientRequestBuildFromNoopEntry(SRpcMsg* pClientRequestRpcMsg, const SSyncRaftEntry* pEntry, int32_t vgId) { int32_t bytes = sizeof(SyncClientRequest) + pEntry->bytes; pClientRequestRpcMsg->pCont = rpcMallocCont(bytes); + pClientRequestRpcMsg->msgType = TDMT_SYNC_CLIENT_REQUEST; + pClientRequestRpcMsg->contLen = bytes; if (pClientRequestRpcMsg->pCont == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -202,8 +94,6 @@ int32_t syncClientRequestBuildFromNoopEntry(SRpcMsg* pClientRequestRpcMsg, const pClientRequest->dataLen = pEntry->bytes; memcpy(pClientRequest->data, (char*)pEntry, pEntry->bytes); - pClientRequestRpcMsg->msgType = TDMT_SYNC_CLIENT_REQUEST; - pClientRequestRpcMsg->contLen = bytes; return 0; } @@ -2343,4 +2233,4 @@ void syncLocalCmdLog2(char* s, const SyncLocalCmd* pMsg) { sTrace("syncLocalCmdLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); taosMemoryFree(serialized); } -} \ No newline at end of file +} diff --git a/source/libs/sync/src/syncTimeout.c b/source/libs/sync/src/syncTimeout.c index 91d807319b..abb6f0c11a 100644 --- a/source/libs/sync/src/syncTimeout.c +++ b/source/libs/sync/src/syncTimeout.c @@ -85,8 +85,10 @@ static int32_t syncNodeTimerRoutine(SSyncNode* ths) { return 0; } -int32_t syncNodeOnTimer(SSyncNode* ths, SyncTimeout* pMsg) { - int32_t ret = 0; +int32_t syncNodeOnTimer(SSyncNode* ths, SRpcMsg* pRpc) { + int32_t ret = 0; + SyncTimeout* pMsg = pRpc->pCont; + syncLogRecvTimer(ths, pMsg, ""); if (pMsg->timeoutType == SYNC_TIMEOUT_PING) { diff --git a/source/libs/sync/test/syncTimeoutTest.cpp b/source/libs/sync/test/syncTimeoutTest.cpp index ed0742f499..211445b5fe 100644 --- a/source/libs/sync/test/syncTimeoutTest.cpp +++ b/source/libs/sync/test/syncTimeoutTest.cpp @@ -28,7 +28,7 @@ void test2() { uint32_t len = pMsg->bytes; char *serialized = (char *)taosMemoryMalloc(len); syncTimeoutSerialize(pMsg, serialized, len); - SyncTimeout *pMsg2 = syncTimeoutBuild(); + SyncTimeout *pMsg2 = syncTimeoutBuildX(); syncTimeoutDeserialize(serialized, len, pMsg2); syncTimeoutLog2((char *)"test2: syncTimeoutSerialize -> syncTimeoutDeserialize ", pMsg2); diff --git a/source/libs/sync/test/sync_test_lib/inc/syncTest.h b/source/libs/sync/test/sync_test_lib/inc/syncTest.h index f531304178..f84cc5d053 100644 --- a/source/libs/sync/test/sync_test_lib/inc/syncTest.h +++ b/source/libs/sync/test/sync_test_lib/inc/syncTest.h @@ -213,6 +213,24 @@ int32_t syncNodePingSelf(SSyncNode* pSyncNode); int32_t syncNodePingPeers(SSyncNode* pSyncNode); int32_t syncNodePingAll(SSyncNode* pSyncNode); +SyncTimeout* syncTimeoutBuildX(); +SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS, int32_t vgId, + void* data); +void syncTimeoutDestroy(SyncTimeout* pMsg); +void syncTimeoutSerialize(const SyncTimeout* pMsg, char* buf, uint32_t bufLen); +void syncTimeoutDeserialize(const char* buf, uint32_t len, SyncTimeout* pMsg); +char* syncTimeoutSerialize2(const SyncTimeout* pMsg, uint32_t* len); +SyncTimeout* syncTimeoutDeserialize2(const char* buf, uint32_t len); +void syncTimeout2RpcMsg(const SyncTimeout* pMsg, SRpcMsg* pRpcMsg); +void syncTimeoutFromRpcMsg(const SRpcMsg* pRpcMsg, SyncTimeout* pMsg); +SyncTimeout* syncTimeoutFromRpcMsg2(const SRpcMsg* pRpcMsg); +cJSON* syncTimeout2Json(const SyncTimeout* pMsg); +char* syncTimeout2Str(const SyncTimeout* pMsg); +void syncTimeoutPrint(const SyncTimeout* pMsg); +void syncTimeoutPrint2(char* s, const SyncTimeout* pMsg); +void syncTimeoutLog(const SyncTimeout* pMsg); +void syncTimeoutLog2(char* s, const SyncTimeout* pMsg); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c b/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c index c56d983e6e..fcf2d072d2 100644 --- a/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c +++ b/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c @@ -1046,3 +1046,134 @@ void syncClientRequestLog2(char* s, const SyncClientRequest* pMsg) { taosMemoryFree(serialized); } } + +// ---- message process SyncTimeout---- +SyncTimeout* syncTimeoutBuildX() { + uint32_t bytes = sizeof(SyncTimeout); + SyncTimeout* pMsg = taosMemoryMalloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->msgType = TDMT_SYNC_TIMEOUT; + return pMsg; +} + +SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS, int32_t vgId, + void* data) { + SyncTimeout* pMsg = syncTimeoutBuildX(); + pMsg->vgId = vgId; + pMsg->timeoutType = timeoutType; + pMsg->logicClock = logicClock; + pMsg->timerMS = timerMS; + pMsg->data = data; + return pMsg; +} + +char* syncTimeoutSerialize2(const SyncTimeout* pMsg, uint32_t* len) { + char* buf = taosMemoryMalloc(pMsg->bytes); + ASSERT(buf != NULL); + syncTimeoutSerialize(pMsg, buf, pMsg->bytes); + if (len != NULL) { + *len = pMsg->bytes; + } + return buf; +} + +void syncTimeoutDestroy(SyncTimeout* pMsg) { + if (pMsg != NULL) { + taosMemoryFree(pMsg); + } +} + +void syncTimeoutSerialize(const SyncTimeout* pMsg, char* buf, uint32_t bufLen) { + ASSERT(pMsg->bytes <= bufLen); + memcpy(buf, pMsg, pMsg->bytes); +} + +void syncTimeoutDeserialize(const char* buf, uint32_t len, SyncTimeout* pMsg) { + memcpy(pMsg, buf, len); + ASSERT(len == pMsg->bytes); +} + +SyncTimeout* syncTimeoutDeserialize2(const char* buf, uint32_t len) { + uint32_t bytes = *((uint32_t*)buf); + SyncTimeout* pMsg = taosMemoryMalloc(bytes); + ASSERT(pMsg != NULL); + syncTimeoutDeserialize(buf, len, pMsg); + ASSERT(len == pMsg->bytes); + return pMsg; +} + +void syncTimeout2RpcMsg(const SyncTimeout* pMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pMsg->msgType; + pRpcMsg->contLen = pMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + syncTimeoutSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); +} + +void syncTimeoutFromRpcMsg(const SRpcMsg* pRpcMsg, SyncTimeout* pMsg) { + syncTimeoutDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); +} + +SyncTimeout* syncTimeoutFromRpcMsg2(const SRpcMsg* pRpcMsg) { + SyncTimeout* pMsg = syncTimeoutDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); + ASSERT(pMsg != NULL); + return pMsg; +} + +cJSON* syncTimeout2Json(const SyncTimeout* pMsg) { + char u64buf[128] = {0}; + cJSON* pRoot = cJSON_CreateObject(); + + if (pMsg != NULL) { + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + cJSON_AddNumberToObject(pRoot, "timeoutType", pMsg->timeoutType); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->logicClock); + cJSON_AddStringToObject(pRoot, "logicClock", u64buf); + cJSON_AddNumberToObject(pRoot, "timerMS", pMsg->timerMS); + snprintf(u64buf, sizeof(u64buf), "%p", pMsg->data); + cJSON_AddStringToObject(pRoot, "data", u64buf); + } + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncTimeout", pRoot); + return pJson; +} + +char* syncTimeout2Str(const SyncTimeout* pMsg) { + cJSON* pJson = syncTimeout2Json(pMsg); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + +// for debug ---------------------- +void syncTimeoutPrint(const SyncTimeout* pMsg) { + char* serialized = syncTimeout2Str(pMsg); + printf("syncTimeoutPrint | len:%zu | %s \n", strlen(serialized), serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncTimeoutPrint2(char* s, const SyncTimeout* pMsg) { + char* serialized = syncTimeout2Str(pMsg); + printf("syncTimeoutPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncTimeoutLog(const SyncTimeout* pMsg) { + char* serialized = syncTimeout2Str(pMsg); + sTrace("syncTimeoutLog | len:%d | %s", (int32_t)strlen(serialized), serialized); + taosMemoryFree(serialized); +} + +void syncTimeoutLog2(char* s, const SyncTimeout* pMsg) { + if (gRaftDetailLog) { + char* serialized = syncTimeout2Str(pMsg); + sTrace("syncTimeoutLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); + taosMemoryFree(serialized); + } +} -- GitLab