diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index f4890206fe572be40704959bb4a846f9363fb25a..7ca552f179fe6da0bd189797ff0c33817c91dbf0 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -274,8 +274,8 @@ enum { TD_DEF_MSG_TYPE(TDMT_SYNC_HEARTBEAT, "sync-heartbeat", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_HEARTBEAT_REPLY, "sync-heartbeat-reply", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_LOCAL_CMD, "sync-local-cmd", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_SYNC_PRE_SNAPSHOT, "sync-pre-snapshot", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_SYNC_PRE_SNAPSHOT_REPLY, "sync-pre-snapshot-reply", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_PRE_SNAPSHOT, "sync-pre-snapshot", NULL, NULL) // no longer used + TD_DEF_MSG_TYPE(TDMT_SYNC_PRE_SNAPSHOT_REPLY, "sync-pre-snapshot-reply", NULL, NULL) // no longer used TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL) TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG) diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index a322f7680042383db31a0e41693ab78c610709b8..16fe6c1b91f8a7788ad4a48993b337fe6bc7b160 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -195,7 +195,6 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, mmPutMsgToSyncCtrlQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, mmPutMsgToSyncCtrlQueue, 1) == NULL) goto _OVER; diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 4d66b3713791d1128b9c159806b1cb2d0bb6043f..a38b7c10ffc3d35ab8d2be10e899aaba32ae36b7 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -112,7 +112,6 @@ typedef struct SyncAppendEntriesReply { int64_t startTime; } SyncAppendEntriesReply; -// --------------------------------------------- typedef struct SyncHeartbeat { uint32_t bytes; int32_t vgId; @@ -149,28 +148,8 @@ typedef struct SyncPreSnapshot { // private data SyncTerm term; - } SyncPreSnapshot; -SyncPreSnapshot* syncPreSnapshotBuild(int32_t vgId); -void syncPreSnapshotDestroy(SyncPreSnapshot* pMsg); -void syncPreSnapshotSerialize(const SyncPreSnapshot* pMsg, char* buf, uint32_t bufLen); -void syncPreSnapshotDeserialize(const char* buf, uint32_t len, SyncPreSnapshot* pMsg); -char* syncPreSnapshotSerialize2(const SyncPreSnapshot* pMsg, uint32_t* len); -SyncPreSnapshot* syncPreSnapshotDeserialize2(const char* buf, uint32_t len); -void syncPreSnapshot2RpcMsg(const SyncPreSnapshot* pMsg, SRpcMsg* pRpcMsg); -void syncPreSnapshotFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPreSnapshot* pMsg); -SyncPreSnapshot* syncPreSnapshotFromRpcMsg2(const SRpcMsg* pRpcMsg); -cJSON* syncPreSnapshot2Json(const SyncPreSnapshot* pMsg); -char* syncPreSnapshot2Str(const SyncPreSnapshot* pMsg); - -// for debug ---------------------- -void syncPreSnapshotPrint(const SyncPreSnapshot* pMsg); -void syncPreSnapshotPrint2(char* s, const SyncPreSnapshot* pMsg); -void syncPreSnapshotLog(const SyncPreSnapshot* pMsg); -void syncPreSnapshotLog2(char* s, const SyncPreSnapshot* pMsg); - -// --------------------------------------------- typedef struct SyncPreSnapshotReply { uint32_t bytes; int32_t vgId; @@ -181,28 +160,8 @@ typedef struct SyncPreSnapshotReply { // private data SyncTerm term; SyncIndex snapStart; - } SyncPreSnapshotReply; -SyncPreSnapshotReply* syncPreSnapshotReplyBuild(int32_t vgId); -void syncPreSnapshotReplyDestroy(SyncPreSnapshotReply* pMsg); -void syncPreSnapshotReplySerialize(const SyncPreSnapshotReply* pMsg, char* buf, uint32_t bufLen); -void syncPreSnapshotReplyDeserialize(const char* buf, uint32_t len, SyncPreSnapshotReply* pMsg); -char* syncPreSnapshotReplySerialize2(const SyncPreSnapshotReply* pMsg, uint32_t* len); -SyncPreSnapshotReply* syncPreSnapshotReplyDeserialize2(const char* buf, uint32_t len); -void syncPreSnapshotReply2RpcMsg(const SyncPreSnapshotReply* pMsg, SRpcMsg* pRpcMsg); -void syncPreSnapshotReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPreSnapshotReply* pMsg); -SyncPreSnapshotReply* syncPreSnapshotReplyFromRpcMsg2(const SRpcMsg* pRpcMsg); -cJSON* syncPreSnapshotReply2Json(const SyncPreSnapshotReply* pMsg); -char* syncPreSnapshotReply2Str(const SyncPreSnapshotReply* pMsg); - -// for debug ---------------------- -void syncPreSnapshotReplyPrint(const SyncPreSnapshotReply* pMsg); -void syncPreSnapshotReplyPrint2(char* s, const SyncPreSnapshotReply* pMsg); -void syncPreSnapshotReplyLog(const SyncPreSnapshotReply* pMsg); -void syncPreSnapshotReplyLog2(char* s, const SyncPreSnapshotReply* pMsg); - -// --------------------------------------------- typedef struct SyncApplyMsg { uint32_t bytes; int32_t vgId; @@ -375,9 +334,6 @@ int32_t syncNodeOnRequestVoteReply(SSyncNode* pNode, const SRpcMsg* pMsg); int32_t syncNodeOnAppendEntries(SSyncNode* pNode, const SRpcMsg* pMsg); int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pMsg); -int32_t syncNodeOnPreSnapshot(SSyncNode* ths, SyncPreSnapshot* pMsg); -int32_t syncNodeOnPreSnapshotReply(SSyncNode* ths, SyncPreSnapshotReply* pMsg); - int32_t syncNodeOnSnapshot(SSyncNode* ths, SyncSnapshotSend* pMsg); int32_t syncNodeOnSnapshotReply(SSyncNode* ths, SyncSnapshotRsp* pMsg); @@ -404,6 +360,8 @@ int32_t syncBuildAppendEntries(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId); int32_t syncBuildAppendEntriesReply(SRpcMsg* pMsg, int32_t vgId); int32_t syncBuildHeartbeat(SRpcMsg* pMsg, int32_t vgId); int32_t syncBuildHeartbeatReply(SRpcMsg* pMsg, int32_t vgId); +int32_t syncBuildPreSnapshot(SRpcMsg* pMsg, int32_t vgId); +int32_t syncBuildPreSnapshotReply(SRpcMsg* pMsg, int32_t vgId); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index cc68d370a86c87647d8a99737516a1cb1d6c5e46..26bedb2b6614f19f2e3d5b5c71a240deb58a413b 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -189,301 +189,38 @@ int32_t syncBuildHeartbeatReply(SRpcMsg* pMsg, int32_t vgId) { return 0; } -// ---- message process SyncPreSnapshot---- -SyncPreSnapshot* syncPreSnapshotBuild(int32_t vgId) { - uint32_t bytes = sizeof(SyncPreSnapshot); - SyncPreSnapshot* pMsg = taosMemoryMalloc(bytes); - memset(pMsg, 0, bytes); - pMsg->bytes = bytes; - pMsg->vgId = vgId; +int32_t syncBuildPreSnapshot(SRpcMsg* pMsg, int32_t vgId) { + int32_t bytes = sizeof(SyncPreSnapshot); + pMsg->pCont = rpcMallocCont(bytes); pMsg->msgType = TDMT_SYNC_PRE_SNAPSHOT; - return pMsg; -} - -void syncPreSnapshotDestroy(SyncPreSnapshot* pMsg) { - if (pMsg != NULL) { - taosMemoryFree(pMsg); - } -} - -void syncPreSnapshotSerialize(const SyncPreSnapshot* pMsg, char* buf, uint32_t bufLen) { - ASSERT(pMsg->bytes <= bufLen); - memcpy(buf, pMsg, pMsg->bytes); -} - -void syncPreSnapshotDeserialize(const char* buf, uint32_t len, SyncPreSnapshot* pMsg) { - memcpy(pMsg, buf, len); - ASSERT(len == pMsg->bytes); -} - -char* syncPreSnapshotSerialize2(const SyncPreSnapshot* pMsg, uint32_t* len) { - char* buf = taosMemoryMalloc(pMsg->bytes); - ASSERT(buf != NULL); - syncPreSnapshotSerialize(pMsg, buf, pMsg->bytes); - if (len != NULL) { - *len = pMsg->bytes; - } - return buf; -} - -SyncPreSnapshot* syncPreSnapshotDeserialize2(const char* buf, uint32_t len) { - uint32_t bytes = *((uint32_t*)buf); - SyncPreSnapshot* pMsg = taosMemoryMalloc(bytes); - ASSERT(pMsg != NULL); - syncPreSnapshotDeserialize(buf, len, pMsg); - ASSERT(len == pMsg->bytes); - return pMsg; -} - -void syncPreSnapshot2RpcMsg(const SyncPreSnapshot* pMsg, SRpcMsg* pRpcMsg) { - memset(pRpcMsg, 0, sizeof(*pRpcMsg)); - pRpcMsg->msgType = pMsg->msgType; - pRpcMsg->contLen = pMsg->bytes; - pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); - syncPreSnapshotSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); -} - -void syncPreSnapshotFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPreSnapshot* pMsg) { - syncPreSnapshotDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); -} - -SyncPreSnapshot* syncPreSnapshotFromRpcMsg2(const SRpcMsg* pRpcMsg) { - SyncPreSnapshot* pMsg = syncPreSnapshotDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); - ASSERT(pMsg != NULL); - return pMsg; -} - -cJSON* syncPreSnapshot2Json(const SyncPreSnapshot* pMsg) { - char u64buf[128] = {0}; - cJSON* pRoot = cJSON_CreateObject(); - - if (pMsg != NULL) { - cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); - cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); - cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); - - cJSON* pSrcId = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr); - cJSON_AddStringToObject(pSrcId, "addr", u64buf); - { - uint64_t u64 = pMsg->srcId.addr; - cJSON* pTmp = pSrcId; - char host[128] = {0}; - uint16_t port; - syncUtilU642Addr(u64, host, sizeof(host), &port); - cJSON_AddStringToObject(pTmp, "addr_host", host); - cJSON_AddNumberToObject(pTmp, "addr_port", port); - } - cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); - cJSON_AddItemToObject(pRoot, "srcId", pSrcId); - - cJSON* pDestId = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr); - cJSON_AddStringToObject(pDestId, "addr", u64buf); - { - uint64_t u64 = pMsg->destId.addr; - cJSON* pTmp = pDestId; - char host[128] = {0}; - uint16_t port; - syncUtilU642Addr(u64, host, sizeof(host), &port); - cJSON_AddStringToObject(pTmp, "addr_host", host); - cJSON_AddNumberToObject(pTmp, "addr_port", port); - } - cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); - cJSON_AddItemToObject(pRoot, "destId", pDestId); - - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term); - cJSON_AddStringToObject(pRoot, "term", u64buf); + pMsg->contLen = bytes; + if (pMsg->pCont == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; } - cJSON* pJson = cJSON_CreateObject(); - cJSON_AddItemToObject(pJson, "SyncPreSnapshot", pRoot); - return pJson; -} - -char* syncPreSnapshot2Str(const SyncPreSnapshot* pMsg) { - cJSON* pJson = syncPreSnapshot2Json(pMsg); - char* serialized = cJSON_Print(pJson); - cJSON_Delete(pJson); - return serialized; -} - -void syncPreSnapshotPrint(const SyncPreSnapshot* pMsg) { - char* serialized = syncPreSnapshot2Str(pMsg); - printf("syncPreSnapshotPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized); - fflush(NULL); - taosMemoryFree(serialized); -} - -void syncPreSnapshotPrint2(char* s, const SyncPreSnapshot* pMsg) { - char* serialized = syncPreSnapshot2Str(pMsg); - printf("syncPreSnapshotPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized); - fflush(NULL); - taosMemoryFree(serialized); -} - -void syncPreSnapshotLog(const SyncPreSnapshot* pMsg) { - char* serialized = syncPreSnapshot2Str(pMsg); - sTrace("syncPreSnapshotLog | len:%d | %s", (int32_t)strlen(serialized), serialized); - taosMemoryFree(serialized); -} - -void syncPreSnapshotLog2(char* s, const SyncPreSnapshot* pMsg) { - if (gRaftDetailLog) { - char* serialized = syncPreSnapshot2Str(pMsg); - sTrace("syncPreSnapshotLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); - taosMemoryFree(serialized); - } + SyncPreSnapshot* pPreSnapshot = pMsg->pCont; + pPreSnapshot->bytes = bytes; + pPreSnapshot->msgType = TDMT_SYNC_PRE_SNAPSHOT; + pPreSnapshot->vgId = vgId; + return 0; } -// ---- message process SyncPreSnapshotReply---- -SyncPreSnapshotReply* syncPreSnapshotReplyBuild(int32_t vgId) { - uint32_t bytes = sizeof(SyncPreSnapshotReply); - SyncPreSnapshotReply* pMsg = taosMemoryMalloc(bytes); - memset(pMsg, 0, bytes); - pMsg->bytes = bytes; - pMsg->vgId = vgId; +int32_t syncBuildPreSnapshotReply(SRpcMsg* pMsg, int32_t vgId) { + int32_t bytes = sizeof(SyncPreSnapshotReply); + pMsg->pCont = rpcMallocCont(bytes); pMsg->msgType = TDMT_SYNC_PRE_SNAPSHOT_REPLY; - return pMsg; -} - -void syncPreSnapshotReplyDestroy(SyncPreSnapshotReply* pMsg) { - if (pMsg != NULL) { - taosMemoryFree(pMsg); - } -} - -void syncPreSnapshotReplySerialize(const SyncPreSnapshotReply* pMsg, char* buf, uint32_t bufLen) { - ASSERT(pMsg->bytes <= bufLen); - memcpy(buf, pMsg, pMsg->bytes); -} - -void syncPreSnapshotReplyDeserialize(const char* buf, uint32_t len, SyncPreSnapshotReply* pMsg) { - memcpy(pMsg, buf, len); - ASSERT(len == pMsg->bytes); -} - -char* syncPreSnapshotReplySerialize2(const SyncPreSnapshotReply* pMsg, uint32_t* len) { - char* buf = taosMemoryMalloc(pMsg->bytes); - ASSERT(buf != NULL); - syncPreSnapshotReplySerialize(pMsg, buf, pMsg->bytes); - if (len != NULL) { - *len = pMsg->bytes; - } - return buf; -} - -SyncPreSnapshotReply* syncPreSnapshotReplyDeserialize2(const char* buf, uint32_t len) { - uint32_t bytes = *((uint32_t*)buf); - SyncPreSnapshotReply* pMsg = taosMemoryMalloc(bytes); - ASSERT(pMsg != NULL); - syncPreSnapshotReplyDeserialize(buf, len, pMsg); - ASSERT(len == pMsg->bytes); - return pMsg; -} - -void syncPreSnapshotReply2RpcMsg(const SyncPreSnapshotReply* pMsg, SRpcMsg* pRpcMsg) { - memset(pRpcMsg, 0, sizeof(*pRpcMsg)); - pRpcMsg->msgType = pMsg->msgType; - pRpcMsg->contLen = pMsg->bytes; - pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); - syncPreSnapshotReplySerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); -} - -void syncPreSnapshotReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPreSnapshotReply* pMsg) { - syncPreSnapshotReplyDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); -} - -SyncPreSnapshotReply* syncPreSnapshotReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) { - SyncPreSnapshotReply* pMsg = syncPreSnapshotReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); - ASSERT(pMsg != NULL); - return pMsg; -} - -cJSON* syncPreSnapshotReply2Json(const SyncPreSnapshotReply* pMsg) { - char u64buf[128] = {0}; - cJSON* pRoot = cJSON_CreateObject(); - - if (pMsg != NULL) { - cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); - cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); - cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); - - cJSON* pSrcId = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr); - cJSON_AddStringToObject(pSrcId, "addr", u64buf); - { - uint64_t u64 = pMsg->srcId.addr; - cJSON* pTmp = pSrcId; - char host[128] = {0}; - uint16_t port; - syncUtilU642Addr(u64, host, sizeof(host), &port); - cJSON_AddStringToObject(pTmp, "addr_host", host); - cJSON_AddNumberToObject(pTmp, "addr_port", port); - } - cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); - cJSON_AddItemToObject(pRoot, "srcId", pSrcId); - - cJSON* pDestId = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr); - cJSON_AddStringToObject(pDestId, "addr", u64buf); - { - uint64_t u64 = pMsg->destId.addr; - cJSON* pTmp = pDestId; - char host[128] = {0}; - uint16_t port; - syncUtilU642Addr(u64, host, sizeof(host), &port); - cJSON_AddStringToObject(pTmp, "addr_host", host); - cJSON_AddNumberToObject(pTmp, "addr_port", port); - } - cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); - cJSON_AddItemToObject(pRoot, "destId", pDestId); - - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term); - cJSON_AddStringToObject(pRoot, "term", u64buf); - - snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->snapStart); - cJSON_AddStringToObject(pRoot, "snap-start", u64buf); + pMsg->contLen = bytes; + if (pMsg->pCont == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; } - cJSON* pJson = cJSON_CreateObject(); - cJSON_AddItemToObject(pJson, "SyncPreSnapshotReply", pRoot); - return pJson; -} - -char* syncPreSnapshotReply2Str(const SyncPreSnapshotReply* pMsg) { - cJSON* pJson = syncPreSnapshotReply2Json(pMsg); - char* serialized = cJSON_Print(pJson); - cJSON_Delete(pJson); - return serialized; -} - -void syncPreSnapshotReplyPrint(const SyncPreSnapshotReply* pMsg) { - char* serialized = syncPreSnapshotReply2Str(pMsg); - printf("syncPreSnapshotReplyPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized); - fflush(NULL); - taosMemoryFree(serialized); -} - -void syncPreSnapshotReplyPrint2(char* s, const SyncPreSnapshotReply* pMsg) { - char* serialized = syncPreSnapshotReply2Str(pMsg); - printf("syncPreSnapshotReplyPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized); - fflush(NULL); - taosMemoryFree(serialized); -} - -void syncPreSnapshotReplyLog(const SyncPreSnapshotReply* pMsg) { - char* serialized = syncPreSnapshotReply2Str(pMsg); - sTrace("syncPreSnapshotReplyLog | len:%d | %s", (int32_t)strlen(serialized), serialized); - taosMemoryFree(serialized); -} - -void syncPreSnapshotReplyLog2(char* s, const SyncPreSnapshotReply* pMsg) { - if (gRaftDetailLog) { - char* serialized = syncPreSnapshotReply2Str(pMsg); - sTrace("syncPreSnapshotReplyLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); - taosMemoryFree(serialized); - } + SyncPreSnapshotReply* pPreSnapshotReply = pMsg->pCont; + pPreSnapshotReply->bytes = bytes; + pPreSnapshotReply->msgType = TDMT_SYNC_PRE_SNAPSHOT_REPLY; + pPreSnapshotReply->vgId = vgId; + return 0; } // ---- message process SyncApplyMsg---- diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 41dc3d3c395b09c96a33ea35f109153ae3a5247d..256be35f052d73f8a61628337353095778c3dd66 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -903,60 +903,3 @@ int32_t syncNodeOnSnapshotReply(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) { return 0; } - -int32_t syncNodeOnPreSnapshot(SSyncNode *ths, SyncPreSnapshot *pMsg) { - syncLogRecvSyncPreSnapshot(ths, pMsg, ""); - - SyncPreSnapshotReply *pMsgReply = syncPreSnapshotReplyBuild(ths->vgId); - pMsgReply->srcId = ths->myRaftId; - pMsgReply->destId = pMsg->srcId; - pMsgReply->term = ths->pRaftStore->currentTerm; - - SSyncLogStoreData *pData = ths->pLogStore->data; - SWal *pWal = pData->pWal; - - if (syncNodeIsMnode(ths)) { - pMsgReply->snapStart = SYNC_INDEX_BEGIN; - - } else { - bool isEmpty = ths->pLogStore->syncLogIsEmpty(ths->pLogStore); - int64_t walCommitVer = walGetCommittedVer(pWal); - - if (!isEmpty && ths->commitIndex != walCommitVer) { - sNError(ths, "commit not same, wal-commit:%" PRId64 ", commit:%" PRId64 ", ignore", walCommitVer, - ths->commitIndex); - goto _IGNORE; - } - - pMsgReply->snapStart = ths->commitIndex + 1; - - // make local log clean - int32_t code = ths->pLogStore->syncLogTruncate(ths->pLogStore, pMsgReply->snapStart); - if (code != 0) { - sNError(ths, "truncate wal error"); - goto _IGNORE; - } - } - - // can not write behind _RESPONSE - SRpcMsg rpcMsg; - -_RESPONSE: - syncPreSnapshotReply2RpcMsg(pMsgReply, &rpcMsg); - syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg); - - syncPreSnapshotReplyDestroy(pMsgReply); - return 0; - -_IGNORE: - syncPreSnapshotReplyDestroy(pMsgReply); - return 0; -} - -int32_t syncNodeOnPreSnapshotReply(SSyncNode *ths, SyncPreSnapshotReply *pMsg) { - syncLogRecvSyncPreSnapshotReply(ths, pMsg, ""); - - // start snapshot - - return 0; -} \ No newline at end of file diff --git a/source/libs/sync/test/sync_test_lib/inc/syncTest.h b/source/libs/sync/test/sync_test_lib/inc/syncTest.h index f853e7c823081317add6021056ab63a93a657ba3..0df4c69d64c9f9c8da498b1a06c8d4a771b0d799 100644 --- a/source/libs/sync/test/sync_test_lib/inc/syncTest.h +++ b/source/libs/sync/test/sync_test_lib/inc/syncTest.h @@ -349,6 +349,46 @@ void syncHeartbeatReplyPrint2(char* s, const SyncHeartbeatReply* pMsg); void syncHeartbeatReplyLog(const SyncHeartbeatReply* pMsg); void syncHeartbeatReplyLog2(char* s, const SyncHeartbeatReply* pMsg); +SyncPreSnapshot* syncPreSnapshotBuild(int32_t vgId); +void syncPreSnapshotDestroy(SyncPreSnapshot* pMsg); +void syncPreSnapshotSerialize(const SyncPreSnapshot* pMsg, char* buf, uint32_t bufLen); +void syncPreSnapshotDeserialize(const char* buf, uint32_t len, SyncPreSnapshot* pMsg); +char* syncPreSnapshotSerialize2(const SyncPreSnapshot* pMsg, uint32_t* len); +SyncPreSnapshot* syncPreSnapshotDeserialize2(const char* buf, uint32_t len); +void syncPreSnapshot2RpcMsg(const SyncPreSnapshot* pMsg, SRpcMsg* pRpcMsg); +void syncPreSnapshotFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPreSnapshot* pMsg); +SyncPreSnapshot* syncPreSnapshotFromRpcMsg2(const SRpcMsg* pRpcMsg); +cJSON* syncPreSnapshot2Json(const SyncPreSnapshot* pMsg); +char* syncPreSnapshot2Str(const SyncPreSnapshot* pMsg); + +// for debug ---------------------- +void syncPreSnapshotPrint(const SyncPreSnapshot* pMsg); +void syncPreSnapshotPrint2(char* s, const SyncPreSnapshot* pMsg); +void syncPreSnapshotLog(const SyncPreSnapshot* pMsg); +void syncPreSnapshotLog2(char* s, const SyncPreSnapshot* pMsg); + +SyncPreSnapshotReply* syncPreSnapshotReplyBuild(int32_t vgId); +void syncPreSnapshotReplyDestroy(SyncPreSnapshotReply* pMsg); +void syncPreSnapshotReplySerialize(const SyncPreSnapshotReply* pMsg, char* buf, uint32_t bufLen); +void syncPreSnapshotReplyDeserialize(const char* buf, uint32_t len, SyncPreSnapshotReply* pMsg); +char* syncPreSnapshotReplySerialize2(const SyncPreSnapshotReply* pMsg, uint32_t* len); +SyncPreSnapshotReply* syncPreSnapshotReplyDeserialize2(const char* buf, uint32_t len); +void syncPreSnapshotReply2RpcMsg(const SyncPreSnapshotReply* pMsg, SRpcMsg* pRpcMsg); +void syncPreSnapshotReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPreSnapshotReply* pMsg); +SyncPreSnapshotReply* syncPreSnapshotReplyFromRpcMsg2(const SRpcMsg* pRpcMsg); +cJSON* syncPreSnapshotReply2Json(const SyncPreSnapshotReply* pMsg); +char* syncPreSnapshotReply2Str(const SyncPreSnapshotReply* pMsg); + +// for debug ---------------------- +void syncPreSnapshotReplyPrint(const SyncPreSnapshotReply* pMsg); +void syncPreSnapshotReplyPrint2(char* s, const SyncPreSnapshotReply* pMsg); +void syncPreSnapshotReplyLog(const SyncPreSnapshotReply* pMsg); +void syncPreSnapshotReplyLog2(char* s, const SyncPreSnapshotReply* pMsg); + +// --------------------------------------------- +int32_t syncNodeOnPreSnapshot(SSyncNode* ths, SyncPreSnapshot* pMsg); +int32_t syncNodeOnPreSnapshotReply(SSyncNode* ths, SyncPreSnapshotReply* pMsg); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c b/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c index fe4f78976fa8a68195e55fc1bc1c22450b365810..d1125c66d93835e52d91cc40f0fcd1a88c204adc 100644 --- a/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c +++ b/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c @@ -1942,3 +1942,300 @@ void syncHeartbeatReplyLog2(char* s, const SyncHeartbeatReply* pMsg) { taosMemoryFree(serialized); } } + +// ---- message process SyncPreSnapshot---- +SyncPreSnapshot* syncPreSnapshotBuild(int32_t vgId) { + uint32_t bytes = sizeof(SyncPreSnapshot); + SyncPreSnapshot* pMsg = taosMemoryMalloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->vgId = vgId; + pMsg->msgType = TDMT_SYNC_PRE_SNAPSHOT; + return pMsg; +} + +void syncPreSnapshotDestroy(SyncPreSnapshot* pMsg) { + if (pMsg != NULL) { + taosMemoryFree(pMsg); + } +} + +void syncPreSnapshotSerialize(const SyncPreSnapshot* pMsg, char* buf, uint32_t bufLen) { + ASSERT(pMsg->bytes <= bufLen); + memcpy(buf, pMsg, pMsg->bytes); +} + +void syncPreSnapshotDeserialize(const char* buf, uint32_t len, SyncPreSnapshot* pMsg) { + memcpy(pMsg, buf, len); + ASSERT(len == pMsg->bytes); +} + +char* syncPreSnapshotSerialize2(const SyncPreSnapshot* pMsg, uint32_t* len) { + char* buf = taosMemoryMalloc(pMsg->bytes); + ASSERT(buf != NULL); + syncPreSnapshotSerialize(pMsg, buf, pMsg->bytes); + if (len != NULL) { + *len = pMsg->bytes; + } + return buf; +} + +SyncPreSnapshot* syncPreSnapshotDeserialize2(const char* buf, uint32_t len) { + uint32_t bytes = *((uint32_t*)buf); + SyncPreSnapshot* pMsg = taosMemoryMalloc(bytes); + ASSERT(pMsg != NULL); + syncPreSnapshotDeserialize(buf, len, pMsg); + ASSERT(len == pMsg->bytes); + return pMsg; +} + +void syncPreSnapshot2RpcMsg(const SyncPreSnapshot* pMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pMsg->msgType; + pRpcMsg->contLen = pMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + syncPreSnapshotSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); +} + +void syncPreSnapshotFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPreSnapshot* pMsg) { + syncPreSnapshotDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); +} + +SyncPreSnapshot* syncPreSnapshotFromRpcMsg2(const SRpcMsg* pRpcMsg) { + SyncPreSnapshot* pMsg = syncPreSnapshotDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); + ASSERT(pMsg != NULL); + return pMsg; +} + +cJSON* syncPreSnapshot2Json(const SyncPreSnapshot* pMsg) { + char u64buf[128] = {0}; + cJSON* pRoot = cJSON_CreateObject(); + + if (pMsg != NULL) { + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + + cJSON* pSrcId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr); + cJSON_AddStringToObject(pSrcId, "addr", u64buf); + { + uint64_t u64 = pMsg->srcId.addr; + cJSON* pTmp = pSrcId; + char host[128] = {0}; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); + cJSON_AddItemToObject(pRoot, "srcId", pSrcId); + + cJSON* pDestId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr); + cJSON_AddStringToObject(pDestId, "addr", u64buf); + { + uint64_t u64 = pMsg->destId.addr; + cJSON* pTmp = pDestId; + char host[128] = {0}; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); + cJSON_AddItemToObject(pRoot, "destId", pDestId); + + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term); + cJSON_AddStringToObject(pRoot, "term", u64buf); + } + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncPreSnapshot", pRoot); + return pJson; +} + +char* syncPreSnapshot2Str(const SyncPreSnapshot* pMsg) { + cJSON* pJson = syncPreSnapshot2Json(pMsg); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + +void syncPreSnapshotPrint(const SyncPreSnapshot* pMsg) { + char* serialized = syncPreSnapshot2Str(pMsg); + printf("syncPreSnapshotPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncPreSnapshotPrint2(char* s, const SyncPreSnapshot* pMsg) { + char* serialized = syncPreSnapshot2Str(pMsg); + printf("syncPreSnapshotPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncPreSnapshotLog(const SyncPreSnapshot* pMsg) { + char* serialized = syncPreSnapshot2Str(pMsg); + sTrace("syncPreSnapshotLog | len:%d | %s", (int32_t)strlen(serialized), serialized); + taosMemoryFree(serialized); +} + +void syncPreSnapshotLog2(char* s, const SyncPreSnapshot* pMsg) { + if (gRaftDetailLog) { + char* serialized = syncPreSnapshot2Str(pMsg); + sTrace("syncPreSnapshotLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); + taosMemoryFree(serialized); + } +} + +// ---- message process SyncPreSnapshotReply---- +SyncPreSnapshotReply* syncPreSnapshotReplyBuild(int32_t vgId) { + uint32_t bytes = sizeof(SyncPreSnapshotReply); + SyncPreSnapshotReply* pMsg = taosMemoryMalloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->vgId = vgId; + pMsg->msgType = TDMT_SYNC_PRE_SNAPSHOT_REPLY; + return pMsg; +} + +void syncPreSnapshotReplyDestroy(SyncPreSnapshotReply* pMsg) { + if (pMsg != NULL) { + taosMemoryFree(pMsg); + } +} + +void syncPreSnapshotReplySerialize(const SyncPreSnapshotReply* pMsg, char* buf, uint32_t bufLen) { + ASSERT(pMsg->bytes <= bufLen); + memcpy(buf, pMsg, pMsg->bytes); +} + +void syncPreSnapshotReplyDeserialize(const char* buf, uint32_t len, SyncPreSnapshotReply* pMsg) { + memcpy(pMsg, buf, len); + ASSERT(len == pMsg->bytes); +} + +char* syncPreSnapshotReplySerialize2(const SyncPreSnapshotReply* pMsg, uint32_t* len) { + char* buf = taosMemoryMalloc(pMsg->bytes); + ASSERT(buf != NULL); + syncPreSnapshotReplySerialize(pMsg, buf, pMsg->bytes); + if (len != NULL) { + *len = pMsg->bytes; + } + return buf; +} + +SyncPreSnapshotReply* syncPreSnapshotReplyDeserialize2(const char* buf, uint32_t len) { + uint32_t bytes = *((uint32_t*)buf); + SyncPreSnapshotReply* pMsg = taosMemoryMalloc(bytes); + ASSERT(pMsg != NULL); + syncPreSnapshotReplyDeserialize(buf, len, pMsg); + ASSERT(len == pMsg->bytes); + return pMsg; +} + +void syncPreSnapshotReply2RpcMsg(const SyncPreSnapshotReply* pMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pMsg->msgType; + pRpcMsg->contLen = pMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + syncPreSnapshotReplySerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); +} + +void syncPreSnapshotReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPreSnapshotReply* pMsg) { + syncPreSnapshotReplyDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); +} + +SyncPreSnapshotReply* syncPreSnapshotReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) { + SyncPreSnapshotReply* pMsg = syncPreSnapshotReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); + ASSERT(pMsg != NULL); + return pMsg; +} + +cJSON* syncPreSnapshotReply2Json(const SyncPreSnapshotReply* pMsg) { + char u64buf[128] = {0}; + cJSON* pRoot = cJSON_CreateObject(); + + if (pMsg != NULL) { + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + + cJSON* pSrcId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr); + cJSON_AddStringToObject(pSrcId, "addr", u64buf); + { + uint64_t u64 = pMsg->srcId.addr; + cJSON* pTmp = pSrcId; + char host[128] = {0}; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); + cJSON_AddItemToObject(pRoot, "srcId", pSrcId); + + cJSON* pDestId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr); + cJSON_AddStringToObject(pDestId, "addr", u64buf); + { + uint64_t u64 = pMsg->destId.addr; + cJSON* pTmp = pDestId; + char host[128] = {0}; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); + cJSON_AddItemToObject(pRoot, "destId", pDestId); + + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term); + cJSON_AddStringToObject(pRoot, "term", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->snapStart); + cJSON_AddStringToObject(pRoot, "snap-start", u64buf); + } + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncPreSnapshotReply", pRoot); + return pJson; +} + +char* syncPreSnapshotReply2Str(const SyncPreSnapshotReply* pMsg) { + cJSON* pJson = syncPreSnapshotReply2Json(pMsg); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + +void syncPreSnapshotReplyPrint(const SyncPreSnapshotReply* pMsg) { + char* serialized = syncPreSnapshotReply2Str(pMsg); + printf("syncPreSnapshotReplyPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncPreSnapshotReplyPrint2(char* s, const SyncPreSnapshotReply* pMsg) { + char* serialized = syncPreSnapshotReply2Str(pMsg); + printf("syncPreSnapshotReplyPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncPreSnapshotReplyLog(const SyncPreSnapshotReply* pMsg) { + char* serialized = syncPreSnapshotReply2Str(pMsg); + sTrace("syncPreSnapshotReplyLog | len:%d | %s", (int32_t)strlen(serialized), serialized); + taosMemoryFree(serialized); +} + +void syncPreSnapshotReplyLog2(char* s, const SyncPreSnapshotReply* pMsg) { + if (gRaftDetailLog) { + char* serialized = syncPreSnapshotReply2Str(pMsg); + sTrace("syncPreSnapshotReplyLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); + taosMemoryFree(serialized); + } +} diff --git a/source/libs/sync/test/sync_test_lib/src/syncSnapshotDebug.c b/source/libs/sync/test/sync_test_lib/src/syncSnapshotDebug.c index fdaf4b8b341ccc61b645eabeb433e57a11617556..82e23b2885b752712aa0ef5a6740f5149a0118b1 100644 --- a/source/libs/sync/test/sync_test_lib/src/syncSnapshotDebug.c +++ b/source/libs/sync/test/sync_test_lib/src/syncSnapshotDebug.c @@ -130,3 +130,60 @@ char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { cJSON_Delete(pJson); return serialized; } + +int32_t syncNodeOnPreSnapshot(SSyncNode *ths, SyncPreSnapshot *pMsg) { + syncLogRecvSyncPreSnapshot(ths, pMsg, ""); + + SyncPreSnapshotReply *pMsgReply = syncPreSnapshotReplyBuild(ths->vgId); + pMsgReply->srcId = ths->myRaftId; + pMsgReply->destId = pMsg->srcId; + pMsgReply->term = ths->pRaftStore->currentTerm; + + SSyncLogStoreData *pData = ths->pLogStore->data; + SWal *pWal = pData->pWal; + + if (syncNodeIsMnode(ths)) { + pMsgReply->snapStart = SYNC_INDEX_BEGIN; + + } else { + bool isEmpty = ths->pLogStore->syncLogIsEmpty(ths->pLogStore); + int64_t walCommitVer = walGetCommittedVer(pWal); + + if (!isEmpty && ths->commitIndex != walCommitVer) { + sNError(ths, "commit not same, wal-commit:%" PRId64 ", commit:%" PRId64 ", ignore", walCommitVer, + ths->commitIndex); + goto _IGNORE; + } + + pMsgReply->snapStart = ths->commitIndex + 1; + + // make local log clean + int32_t code = ths->pLogStore->syncLogTruncate(ths->pLogStore, pMsgReply->snapStart); + if (code != 0) { + sNError(ths, "truncate wal error"); + goto _IGNORE; + } + } + + // can not write behind _RESPONSE + SRpcMsg rpcMsg; + +_RESPONSE: + syncPreSnapshotReply2RpcMsg(pMsgReply, &rpcMsg); + syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg); + + syncPreSnapshotReplyDestroy(pMsgReply); + return 0; + +_IGNORE: + syncPreSnapshotReplyDestroy(pMsgReply); + return 0; +} + +int32_t syncNodeOnPreSnapshotReply(SSyncNode *ths, SyncPreSnapshotReply *pMsg) { + syncLogRecvSyncPreSnapshotReply(ths, pMsg, ""); + + // start snapshot + + return 0; +} \ No newline at end of file