From fe1f280ce4440c1d71fdda5897eb5ae1c2f161d5 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 2 Mar 2022 20:43:03 +0800 Subject: [PATCH] sync modify timer --- source/libs/sync/inc/syncMessage.h | 4 ++ source/libs/sync/src/syncIO.c | 5 ++- source/libs/sync/src/syncMain.c | 15 +++++++ source/libs/sync/src/syncMessage.c | 54 +++++++++++++++++++++++++- source/libs/sync/src/syncUtil.c | 1 + source/libs/sync/test/syncPingTest.cpp | 3 +- 6 files changed, 79 insertions(+), 3 deletions(-) diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index bb9ec65d3b..be53559e8a 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -102,6 +102,10 @@ void syncPingReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPingReply* pMsg); cJSON* syncPingReply2Json(const SyncPingReply* pMsg); +SyncPingReply* syncPingReplyBuild2(const SRaftId* srcId, const SRaftId* destId, const char* str); + +SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId); + typedef struct SyncClientRequest { ESyncMessageType msgType; char* data; diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index 590466d1cc..abb52b42d0 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -226,7 +226,10 @@ static void *syncIOConsumerFunc(void *param) { } else if (pRpcMsg->msgType == SYNC_PING_REPLY) { SyncPingReply *pSyncMsg = syncPingReplyBuild(pRpcMsg->contLen); syncPingReplyFromRpcMsg(pRpcMsg, pSyncMsg); - io->FpOnSyncPingReply(io->pSyncNode, pSyncMsg); + + if (io->FpOnSyncPingReply != NULL) { + io->FpOnSyncPingReply(io->pSyncNode, pSyncMsg); + } } else { ; } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index ce3dba4c0e..49fac038da 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -240,11 +240,26 @@ static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) { cJSON_Delete(pJson); } + SyncPingReply* pMsgReply = syncPingReplyBuild3(&ths->raftId, &pMsg->srcId); + SRpcMsg rpcMsg; + syncPingReply2RpcMsg(pMsgReply, &rpcMsg); + syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg); + return ret; } static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) { int32_t ret = 0; + sTrace("<-- syncNodeOnPingReplyCb -->"); + + { + cJSON* pJson = syncPingReply2Json(pMsg); + char* serialized = cJSON_Print(pJson); + sTrace("syncNodeOnPingReplyCb syncNodePing pMsg:%s ", serialized); + free(serialized); + cJSON_Delete(pJson); + } + return ret; } diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 1d1b2e6965..a26c8401b2 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -15,6 +15,7 @@ #include "syncMessage.h" #include "syncRaft.h" +#include "syncUtil.h" #include "tcoding.h" void onMessage(SRaft* pRaft, void* pMsg) {} @@ -65,11 +66,29 @@ cJSON* syncPing2Json(const SyncPing* pMsg) { cJSON* pSrcId = cJSON_CreateObject(); cJSON_AddNumberToObject(pSrcId, "addr", pMsg->srcId.addr); + { + uint64_t u64 = pMsg->srcId.addr; + cJSON* pTmp = pSrcId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); cJSON_AddItemToObject(pRoot, "srcId", pSrcId); cJSON* pDestId = cJSON_CreateObject(); cJSON_AddNumberToObject(pDestId, "addr", pMsg->destId.addr); + { + uint64_t u64 = pMsg->destId.addr; + cJSON* pTmp = pDestId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); cJSON_AddItemToObject(pRoot, "destId", pDestId); @@ -101,7 +120,7 @@ SyncPingReply* syncPingReplyBuild(uint32_t dataLen) { SyncPingReply* pMsg = malloc(bytes); memset(pMsg, 0, bytes); pMsg->bytes = bytes; - pMsg->msgType = SYNC_PING; + pMsg->msgType = SYNC_PING_REPLY; pMsg->dataLen = dataLen; } @@ -123,6 +142,7 @@ void syncPingReplyDeserialize(const char* buf, uint32_t len, SyncPingReply* pMsg } void syncPingReply2RpcMsg(const SyncPingReply* pMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); pRpcMsg->msgType = pMsg->msgType; pRpcMsg->contLen = pMsg->bytes; pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); @@ -140,11 +160,29 @@ cJSON* syncPingReply2Json(const SyncPingReply* pMsg) { cJSON* pSrcId = cJSON_CreateObject(); cJSON_AddNumberToObject(pSrcId, "addr", pMsg->srcId.addr); + { + uint64_t u64 = pMsg->srcId.addr; + cJSON* pTmp = pSrcId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); cJSON_AddItemToObject(pRoot, "srcId", pSrcId); cJSON* pDestId = cJSON_CreateObject(); cJSON_AddNumberToObject(pDestId, "addr", pMsg->destId.addr); + { + uint64_t u64 = pMsg->destId.addr; + cJSON* pTmp = pDestId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); cJSON_AddItemToObject(pRoot, "destId", pDestId); @@ -156,6 +194,20 @@ cJSON* syncPingReply2Json(const SyncPingReply* pMsg) { return pJson; } +SyncPingReply* syncPingReplyBuild2(const SRaftId* srcId, const SRaftId* destId, const char* str) { + uint32_t dataLen = strlen(str) + 1; + SyncPingReply* pMsg = syncPingReplyBuild(dataLen); + pMsg->srcId = *srcId; + pMsg->destId = *destId; + snprintf(pMsg->data, pMsg->dataLen, "%s", str); + return pMsg; +} + +SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId) { + SyncPingReply* pMsg = syncPingReplyBuild2(srcId, destId, "pang"); + return pMsg; +} + #if 0 void syncPingSerialize(const SyncPing* pMsg, char** ppBuf, uint32_t* bufLen) { *bufLen = sizeof(SyncPing) + pMsg->dataLen; diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index a3fbfaf905..b4959a810b 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -38,6 +38,7 @@ void syncUtilU642Addr(uint64_t u64, char* host, size_t len, uint16_t* port) { void syncUtilnodeInfo2EpSet(const SNodeInfo* pNodeInfo, SEpSet* pEpSet) { pEpSet->inUse = 0; + pEpSet->numOfEps = 0; addEpIntoEpSet(pEpSet, pNodeInfo->nodeFqdn, pNodeInfo->nodePort); } diff --git a/source/libs/sync/test/syncPingTest.cpp b/source/libs/sync/test/syncPingTest.cpp index 002838b010..24d9ead5e3 100644 --- a/source/libs/sync/test/syncPingTest.cpp +++ b/source/libs/sync/test/syncPingTest.cpp @@ -25,7 +25,7 @@ SSyncNode* doSync() { SSyncCfg* pCfg = &syncInfo.syncCfg; pCfg->myIndex = 0; - pCfg->replicaNum = 1; + pCfg->replicaNum = 2; pCfg->nodeInfo[0].nodePort = 7010; snprintf(pCfg->nodeInfo[0].nodeFqdn, sizeof(pCfg->nodeInfo[0].nodeFqdn), "%s", "127.0.0.1"); @@ -68,6 +68,7 @@ int main() { SSyncNode* pSyncNode = doSync(); gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; ret = syncNodeStartPingTimer(pSyncNode); assert(ret == 0); -- GitLab