/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #define _DEFAULT_SOURCE #include "syncMessage.h" #include "syncRaftCfg.h" #include "syncRaftEntry.h" #include "syncUtil.h" #include "tcoding.h" int32_t syncTimeoutBuild(SRpcMsg* pTimeoutRpcMsg, ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS, SSyncNode* pNode) { int32_t bytes = sizeof(SyncTimeout); pTimeoutRpcMsg->pCont = rpcMallocCont(bytes); pTimeoutRpcMsg->msgType = TDMT_SYNC_TIMEOUT; pTimeoutRpcMsg->contLen = bytes; if (pTimeoutRpcMsg->pCont == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } SyncTimeout* pTimeout = pTimeoutRpcMsg->pCont; pTimeout->bytes = bytes; pTimeout->msgType = TDMT_SYNC_TIMEOUT; pTimeout->vgId = pNode->vgId; pTimeout->timeoutType = timeoutType; pTimeout->logicClock = logicClock; pTimeout->timerMS = timerMS; pTimeout->data = pNode; return 0; } int32_t syncClientRequestBuildFromRpcMsg(SRpcMsg* pClientRequestRpcMsg, const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak, int32_t vgId) { int32_t bytes = sizeof(SyncClientRequest) + pOriginalRpcMsg->contLen; pClientRequestRpcMsg->pCont = rpcMallocCont(bytes); pClientRequestRpcMsg->msgType = TDMT_SYNC_CLIENT_REQUEST; pClientRequestRpcMsg->contLen = bytes; if (pClientRequestRpcMsg->pCont == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } SyncClientRequest* pClientRequest = pClientRequestRpcMsg->pCont; pClientRequest->bytes = bytes; pClientRequest->vgId = vgId; pClientRequest->msgType = TDMT_SYNC_CLIENT_REQUEST; pClientRequest->originalRpcType = pOriginalRpcMsg->msgType; pClientRequest->seqNum = seqNum; pClientRequest->isWeak = isWeak; pClientRequest->dataLen = pOriginalRpcMsg->contLen; memcpy(pClientRequest->data, (char*)pOriginalRpcMsg->pCont, pOriginalRpcMsg->contLen); return 0; } int32_t syncClientRequestBuildFromNoopEntry(SRpcMsg* pClientRequestRpcMsg, const SSyncRaftEntry* pEntry, int32_t vgId) { int32_t bytes = sizeof(SyncClientRequest) + pEntry->bytes; pClientRequestRpcMsg->pCont = rpcMallocCont(bytes); pClientRequestRpcMsg->msgType = TDMT_SYNC_CLIENT_REQUEST; pClientRequestRpcMsg->contLen = bytes; if (pClientRequestRpcMsg->pCont == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } SyncClientRequest* pClientRequest = pClientRequestRpcMsg->pCont; pClientRequest->bytes = bytes; pClientRequest->vgId = vgId; pClientRequest->msgType = TDMT_SYNC_CLIENT_REQUEST; pClientRequest->originalRpcType = TDMT_SYNC_NOOP; pClientRequest->dataLen = pEntry->bytes; memcpy(pClientRequest->data, (char*)pEntry, pEntry->bytes); return 0; } int32_t syncBuildRequestVote(SRpcMsg* pMsg, int32_t vgId) { int32_t bytes = sizeof(SyncRequestVote); pMsg->pCont = rpcMallocCont(bytes); pMsg->msgType = TDMT_SYNC_REQUEST_VOTE; pMsg->contLen = bytes; if (pMsg->pCont == NULL) { terrno = TDMT_SYNC_REQUEST_VOTE; return -1; } SyncRequestVote* pRequestVote = pMsg->pCont; pRequestVote->bytes = bytes; pRequestVote->msgType = TDMT_SYNC_REQUEST_VOTE; pRequestVote->vgId = vgId; return 0; } int32_t syncBuildRequestVoteReply(SRpcMsg* pMsg, int32_t vgId) { int32_t bytes = sizeof(SyncRequestVoteReply); pMsg->pCont = rpcMallocCont(bytes); pMsg->msgType = TDMT_SYNC_REQUEST_VOTE_REPLY; pMsg->contLen = bytes; if (pMsg->pCont == NULL) { terrno = TDMT_SYNC_REQUEST_VOTE; return -1; } SyncRequestVoteReply* pRequestVoteReply = pMsg->pCont; pRequestVoteReply->bytes = bytes; pRequestVoteReply->msgType = TDMT_SYNC_REQUEST_VOTE_REPLY; pRequestVoteReply->vgId = vgId; return 0; } // ---- message process SyncAppendEntries---- SyncAppendEntries* syncAppendEntriesBuild(uint32_t dataLen, int32_t vgId) { uint32_t bytes = sizeof(SyncAppendEntries) + dataLen; SyncAppendEntries* pMsg = taosMemoryMalloc(bytes); memset(pMsg, 0, bytes); pMsg->bytes = bytes; pMsg->vgId = vgId; pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES; pMsg->dataLen = dataLen; return pMsg; } void syncAppendEntriesDestroy(SyncAppendEntries* pMsg) { if (pMsg != NULL) { taosMemoryFree(pMsg); } } void syncAppendEntriesSerialize(const SyncAppendEntries* pMsg, char* buf, uint32_t bufLen) { ASSERT(pMsg->bytes <= bufLen); memcpy(buf, pMsg, pMsg->bytes); } void syncAppendEntriesDeserialize(const char* buf, uint32_t len, SyncAppendEntries* pMsg) { memcpy(pMsg, buf, len); ASSERT(len == pMsg->bytes); ASSERT(pMsg->bytes == sizeof(SyncAppendEntries) + pMsg->dataLen); } char* syncAppendEntriesSerialize2(const SyncAppendEntries* pMsg, uint32_t* len) { char* buf = taosMemoryMalloc(pMsg->bytes); ASSERT(buf != NULL); syncAppendEntriesSerialize(pMsg, buf, pMsg->bytes); if (len != NULL) { *len = pMsg->bytes; } return buf; } SyncAppendEntries* syncAppendEntriesDeserialize2(const char* buf, uint32_t len) { uint32_t bytes = *((uint32_t*)buf); SyncAppendEntries* pMsg = taosMemoryMalloc(bytes); ASSERT(pMsg != NULL); syncAppendEntriesDeserialize(buf, len, pMsg); ASSERT(len == pMsg->bytes); return pMsg; } void syncAppendEntries2RpcMsg(const SyncAppendEntries* pMsg, SRpcMsg* pRpcMsg) { memset(pRpcMsg, 0, sizeof(*pRpcMsg)); pRpcMsg->msgType = pMsg->msgType; pRpcMsg->contLen = pMsg->bytes; pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); syncAppendEntriesSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); } void syncAppendEntriesFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntries* pMsg) { syncAppendEntriesDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); } SyncAppendEntries* syncAppendEntriesFromRpcMsg2(const SRpcMsg* pRpcMsg) { SyncAppendEntries* pMsg = syncAppendEntriesDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); ASSERT(pMsg != NULL); return pMsg; } cJSON* syncAppendEntries2Json(const SyncAppendEntries* pMsg) { char u64buf[128] = {0}; cJSON* pRoot = cJSON_CreateObject(); if (pMsg != NULL) { cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); cJSON* pSrcId = cJSON_CreateObject(); snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr); cJSON_AddStringToObject(pSrcId, "addr", u64buf); { uint64_t u64 = pMsg->srcId.addr; cJSON* pTmp = pSrcId; char host[128] = {0}; uint16_t port; syncUtilU642Addr(u64, host, sizeof(host), &port); cJSON_AddStringToObject(pTmp, "addr_host", host); cJSON_AddNumberToObject(pTmp, "addr_port", port); } cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); cJSON_AddItemToObject(pRoot, "srcId", pSrcId); cJSON* pDestId = cJSON_CreateObject(); snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr); cJSON_AddStringToObject(pDestId, "addr", u64buf); { uint64_t u64 = pMsg->destId.addr; cJSON* pTmp = pDestId; char host[128] = {0}; uint16_t port; syncUtilU642Addr(u64, host, sizeof(host), &port); cJSON_AddStringToObject(pTmp, "addr_host", host); cJSON_AddNumberToObject(pTmp, "addr_port", port); } cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); cJSON_AddItemToObject(pRoot, "destId", pDestId); snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term); cJSON_AddStringToObject(pRoot, "term", u64buf); snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->privateTerm); cJSON_AddStringToObject(pRoot, "privateTerm", u64buf); snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->prevLogIndex); cJSON_AddStringToObject(pRoot, "prevLogIndex", u64buf); snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->prevLogTerm); cJSON_AddStringToObject(pRoot, "pre_log_term", u64buf); snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->commitIndex); cJSON_AddStringToObject(pRoot, "commitIndex", u64buf); cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); char* s; s = syncUtilPrintBin((char*)(pMsg->data), pMsg->dataLen); cJSON_AddStringToObject(pRoot, "data", s); taosMemoryFree(s); s = syncUtilPrintBin2((char*)(pMsg->data), pMsg->dataLen); cJSON_AddStringToObject(pRoot, "data2", s); taosMemoryFree(s); } cJSON* pJson = cJSON_CreateObject(); cJSON_AddItemToObject(pJson, "SyncAppendEntries", pRoot); return pJson; } char* syncAppendEntries2Str(const SyncAppendEntries* pMsg) { cJSON* pJson = syncAppendEntries2Json(pMsg); char* serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } // for debug ---------------------- void syncAppendEntriesPrint(const SyncAppendEntries* pMsg) { char* serialized = syncAppendEntries2Str(pMsg); printf("syncAppendEntriesPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized); fflush(NULL); taosMemoryFree(serialized); } void syncAppendEntriesPrint2(char* s, const SyncAppendEntries* pMsg) { char* serialized = syncAppendEntries2Str(pMsg); printf("syncAppendEntriesPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized); fflush(NULL); taosMemoryFree(serialized); } void syncAppendEntriesLog(const SyncAppendEntries* pMsg) { char* serialized = syncAppendEntries2Str(pMsg); sTrace("syncAppendEntriesLog | len:%d | %s", (int32_t)strlen(serialized), serialized); taosMemoryFree(serialized); } void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg) { if (gRaftDetailLog) { char* serialized = syncAppendEntries2Str(pMsg); sTrace("syncAppendEntriesLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); taosMemoryFree(serialized); } } // ---- message process SyncAppendEntriesReply---- SyncAppendEntriesReply* syncAppendEntriesReplyBuild(int32_t vgId) { uint32_t bytes = sizeof(SyncAppendEntriesReply); SyncAppendEntriesReply* pMsg = taosMemoryMalloc(bytes); memset(pMsg, 0, bytes); pMsg->bytes = bytes; pMsg->vgId = vgId; pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES_REPLY; return pMsg; } void syncAppendEntriesReplyDestroy(SyncAppendEntriesReply* pMsg) { if (pMsg != NULL) { taosMemoryFree(pMsg); } } void syncAppendEntriesReplySerialize(const SyncAppendEntriesReply* pMsg, char* buf, uint32_t bufLen) { ASSERT(pMsg->bytes <= bufLen); memcpy(buf, pMsg, pMsg->bytes); } void syncAppendEntriesReplyDeserialize(const char* buf, uint32_t len, SyncAppendEntriesReply* pMsg) { memcpy(pMsg, buf, len); ASSERT(len == pMsg->bytes); } char* syncAppendEntriesReplySerialize2(const SyncAppendEntriesReply* pMsg, uint32_t* len) { char* buf = taosMemoryMalloc(pMsg->bytes); ASSERT(buf != NULL); syncAppendEntriesReplySerialize(pMsg, buf, pMsg->bytes); if (len != NULL) { *len = pMsg->bytes; } return buf; } SyncAppendEntriesReply* syncAppendEntriesReplyDeserialize2(const char* buf, uint32_t len) { uint32_t bytes = *((uint32_t*)buf); SyncAppendEntriesReply* pMsg = taosMemoryMalloc(bytes); ASSERT(pMsg != NULL); syncAppendEntriesReplyDeserialize(buf, len, pMsg); ASSERT(len == pMsg->bytes); return pMsg; } void syncAppendEntriesReply2RpcMsg(const SyncAppendEntriesReply* pMsg, SRpcMsg* pRpcMsg) { memset(pRpcMsg, 0, sizeof(*pRpcMsg)); pRpcMsg->msgType = pMsg->msgType; pRpcMsg->contLen = pMsg->bytes; pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); syncAppendEntriesReplySerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); } void syncAppendEntriesReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesReply* pMsg) { syncAppendEntriesReplyDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); } SyncAppendEntriesReply* syncAppendEntriesReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) { SyncAppendEntriesReply* pMsg = syncAppendEntriesReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); ASSERT(pMsg != NULL); return pMsg; } cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg) { char u64buf[128] = {0}; cJSON* pRoot = cJSON_CreateObject(); if (pMsg != NULL) { cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); cJSON* pSrcId = cJSON_CreateObject(); snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr); cJSON_AddStringToObject(pSrcId, "addr", u64buf); { uint64_t u64 = pMsg->srcId.addr; cJSON* pTmp = pSrcId; char host[128] = {0}; uint16_t port; syncUtilU642Addr(u64, host, sizeof(host), &port); cJSON_AddStringToObject(pTmp, "addr_host", host); cJSON_AddNumberToObject(pTmp, "addr_port", port); } cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); cJSON_AddItemToObject(pRoot, "srcId", pSrcId); cJSON* pDestId = cJSON_CreateObject(); snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr); cJSON_AddStringToObject(pDestId, "addr", u64buf); { uint64_t u64 = pMsg->destId.addr; cJSON* pTmp = pDestId; char host[128] = {0}; uint16_t port; syncUtilU642Addr(u64, host, sizeof(host), &port); cJSON_AddStringToObject(pTmp, "addr_host", host); cJSON_AddNumberToObject(pTmp, "addr_port", port); } cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); cJSON_AddItemToObject(pRoot, "destId", pDestId); snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->privateTerm); cJSON_AddStringToObject(pRoot, "privateTerm", u64buf); snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term); cJSON_AddStringToObject(pRoot, "term", u64buf); cJSON_AddNumberToObject(pRoot, "success", pMsg->success); snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->matchIndex); cJSON_AddStringToObject(pRoot, "matchIndex", u64buf); snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->startTime); cJSON_AddStringToObject(pRoot, "startTime", u64buf); } cJSON* pJson = cJSON_CreateObject(); cJSON_AddItemToObject(pJson, "SyncAppendEntriesReply", pRoot); return pJson; } char* syncAppendEntriesReply2Str(const SyncAppendEntriesReply* pMsg) { cJSON* pJson = syncAppendEntriesReply2Json(pMsg); char* serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } // for debug ---------------------- void syncAppendEntriesReplyPrint(const SyncAppendEntriesReply* pMsg) { char* serialized = syncAppendEntriesReply2Str(pMsg); printf("syncAppendEntriesReplyPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized); fflush(NULL); taosMemoryFree(serialized); } void syncAppendEntriesReplyPrint2(char* s, const SyncAppendEntriesReply* pMsg) { char* serialized = syncAppendEntriesReply2Str(pMsg); printf("syncAppendEntriesReplyPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized); fflush(NULL); taosMemoryFree(serialized); } void syncAppendEntriesReplyLog(const SyncAppendEntriesReply* pMsg) { char* serialized = syncAppendEntriesReply2Str(pMsg); sTrace("syncAppendEntriesReplyLog | len:%d| %s", (int32_t)strlen(serialized), serialized); taosMemoryFree(serialized); } void syncAppendEntriesReplyLog2(char* s, const SyncAppendEntriesReply* pMsg) { if (gRaftDetailLog) { char* serialized = syncAppendEntriesReply2Str(pMsg); sTrace("syncAppendEntriesReplyLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); taosMemoryFree(serialized); } } // ---- message process SyncHeartbeat---- SyncHeartbeat* syncHeartbeatBuild(int32_t vgId) { uint32_t bytes = sizeof(SyncHeartbeat); SyncHeartbeat* pMsg = taosMemoryMalloc(bytes); memset(pMsg, 0, bytes); pMsg->bytes = bytes; pMsg->vgId = vgId; pMsg->msgType = TDMT_SYNC_HEARTBEAT; return pMsg; } void syncHeartbeatDestroy(SyncHeartbeat* pMsg) { if (pMsg != NULL) { taosMemoryFree(pMsg); } } void syncHeartbeatSerialize(const SyncHeartbeat* pMsg, char* buf, uint32_t bufLen) { ASSERT(pMsg->bytes <= bufLen); memcpy(buf, pMsg, pMsg->bytes); } void syncHeartbeatDeserialize(const char* buf, uint32_t len, SyncHeartbeat* pMsg) { memcpy(pMsg, buf, len); ASSERT(len == pMsg->bytes); } char* syncHeartbeatSerialize2(const SyncHeartbeat* pMsg, uint32_t* len) { char* buf = taosMemoryMalloc(pMsg->bytes); ASSERT(buf != NULL); syncHeartbeatSerialize(pMsg, buf, pMsg->bytes); if (len != NULL) { *len = pMsg->bytes; } return buf; } SyncHeartbeat* syncHeartbeatDeserialize2(const char* buf, uint32_t len) { uint32_t bytes = *((uint32_t*)buf); SyncHeartbeat* pMsg = taosMemoryMalloc(bytes); ASSERT(pMsg != NULL); syncHeartbeatDeserialize(buf, len, pMsg); ASSERT(len == pMsg->bytes); return pMsg; } void syncHeartbeat2RpcMsg(const SyncHeartbeat* pMsg, SRpcMsg* pRpcMsg) { memset(pRpcMsg, 0, sizeof(*pRpcMsg)); pRpcMsg->msgType = pMsg->msgType; pRpcMsg->contLen = pMsg->bytes; pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); syncHeartbeatSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); } void syncHeartbeatFromRpcMsg(const SRpcMsg* pRpcMsg, SyncHeartbeat* pMsg) { syncHeartbeatDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); } SyncHeartbeat* syncHeartbeatFromRpcMsg2(const SRpcMsg* pRpcMsg) { SyncHeartbeat* pMsg = syncHeartbeatDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); ASSERT(pMsg != NULL); return pMsg; } cJSON* syncHeartbeat2Json(const SyncHeartbeat* pMsg) { char u64buf[128] = {0}; cJSON* pRoot = cJSON_CreateObject(); if (pMsg != NULL) { cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); cJSON* pSrcId = cJSON_CreateObject(); snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr); cJSON_AddStringToObject(pSrcId, "addr", u64buf); { uint64_t u64 = pMsg->srcId.addr; cJSON* pTmp = pSrcId; char host[128] = {0}; uint16_t port; syncUtilU642Addr(u64, host, sizeof(host), &port); cJSON_AddStringToObject(pTmp, "addr_host", host); cJSON_AddNumberToObject(pTmp, "addr_port", port); } cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); cJSON_AddItemToObject(pRoot, "srcId", pSrcId); cJSON* pDestId = cJSON_CreateObject(); snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr); cJSON_AddStringToObject(pDestId, "addr", u64buf); { uint64_t u64 = pMsg->destId.addr; cJSON* pTmp = pDestId; char host[128] = {0}; uint16_t port; syncUtilU642Addr(u64, host, sizeof(host), &port); cJSON_AddStringToObject(pTmp, "addr_host", host); cJSON_AddNumberToObject(pTmp, "addr_port", port); } cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); cJSON_AddItemToObject(pRoot, "destId", pDestId); snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term); cJSON_AddStringToObject(pRoot, "term", u64buf); snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->privateTerm); cJSON_AddStringToObject(pRoot, "privateTerm", u64buf); snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->commitIndex); cJSON_AddStringToObject(pRoot, "commitIndex", u64buf); } cJSON* pJson = cJSON_CreateObject(); cJSON_AddItemToObject(pJson, "SyncHeartbeat", pRoot); return pJson; } char* syncHeartbeat2Str(const SyncHeartbeat* pMsg) { cJSON* pJson = syncHeartbeat2Json(pMsg); char* serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } void syncHeartbeatPrint(const SyncHeartbeat* pMsg) { char* serialized = syncHeartbeat2Str(pMsg); printf("syncHeartbeatPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized); fflush(NULL); taosMemoryFree(serialized); } void syncHeartbeatPrint2(char* s, const SyncHeartbeat* pMsg) { char* serialized = syncHeartbeat2Str(pMsg); printf("syncHeartbeatPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized); fflush(NULL); taosMemoryFree(serialized); } void syncHeartbeatLog(const SyncHeartbeat* pMsg) { char* serialized = syncHeartbeat2Str(pMsg); sTrace("syncHeartbeatLog | len:%d | %s", (int32_t)strlen(serialized), serialized); taosMemoryFree(serialized); } void syncHeartbeatLog2(char* s, const SyncHeartbeat* pMsg) { if (gRaftDetailLog) { char* serialized = syncHeartbeat2Str(pMsg); sTrace("syncHeartbeatLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); taosMemoryFree(serialized); } } // ---- message process SyncHeartbeatReply---- SyncHeartbeatReply* syncHeartbeatReplyBuild(int32_t vgId) { uint32_t bytes = sizeof(SyncHeartbeatReply); SyncHeartbeatReply* pMsg = taosMemoryMalloc(bytes); memset(pMsg, 0, bytes); pMsg->bytes = bytes; pMsg->vgId = vgId; pMsg->msgType = TDMT_SYNC_HEARTBEAT_REPLY; return pMsg; } void syncHeartbeatReplyDestroy(SyncHeartbeatReply* pMsg) { if (pMsg != NULL) { taosMemoryFree(pMsg); } } void syncHeartbeatReplySerialize(const SyncHeartbeatReply* pMsg, char* buf, uint32_t bufLen) { ASSERT(pMsg->bytes <= bufLen); memcpy(buf, pMsg, pMsg->bytes); } void syncHeartbeatReplyDeserialize(const char* buf, uint32_t len, SyncHeartbeatReply* pMsg) { memcpy(pMsg, buf, len); ASSERT(len == pMsg->bytes); } char* syncHeartbeatReplySerialize2(const SyncHeartbeatReply* pMsg, uint32_t* len) { char* buf = taosMemoryMalloc(pMsg->bytes); ASSERT(buf != NULL); syncHeartbeatReplySerialize(pMsg, buf, pMsg->bytes); if (len != NULL) { *len = pMsg->bytes; } return buf; } SyncHeartbeatReply* syncHeartbeatReplyDeserialize2(const char* buf, uint32_t len) { uint32_t bytes = *((uint32_t*)buf); SyncHeartbeatReply* pMsg = taosMemoryMalloc(bytes); ASSERT(pMsg != NULL); syncHeartbeatReplyDeserialize(buf, len, pMsg); ASSERT(len == pMsg->bytes); return pMsg; } void syncHeartbeatReply2RpcMsg(const SyncHeartbeatReply* pMsg, SRpcMsg* pRpcMsg) { memset(pRpcMsg, 0, sizeof(*pRpcMsg)); pRpcMsg->msgType = pMsg->msgType; pRpcMsg->contLen = pMsg->bytes; pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); syncHeartbeatReplySerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); } void syncHeartbeatReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncHeartbeatReply* pMsg) { syncHeartbeatReplyDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); } SyncHeartbeatReply* syncHeartbeatReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) { SyncHeartbeatReply* pMsg = syncHeartbeatReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); ASSERT(pMsg != NULL); return pMsg; } cJSON* syncHeartbeatReply2Json(const SyncHeartbeatReply* pMsg) { char u64buf[128] = {0}; cJSON* pRoot = cJSON_CreateObject(); if (pMsg != NULL) { cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); cJSON* pSrcId = cJSON_CreateObject(); snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr); cJSON_AddStringToObject(pSrcId, "addr", u64buf); { uint64_t u64 = pMsg->srcId.addr; cJSON* pTmp = pSrcId; char host[128] = {0}; uint16_t port; syncUtilU642Addr(u64, host, sizeof(host), &port); cJSON_AddStringToObject(pTmp, "addr_host", host); cJSON_AddNumberToObject(pTmp, "addr_port", port); } cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); cJSON_AddItemToObject(pRoot, "srcId", pSrcId); cJSON* pDestId = cJSON_CreateObject(); snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr); cJSON_AddStringToObject(pDestId, "addr", u64buf); { uint64_t u64 = pMsg->destId.addr; cJSON* pTmp = pDestId; char host[128] = {0}; uint16_t port; syncUtilU642Addr(u64, host, sizeof(host), &port); cJSON_AddStringToObject(pTmp, "addr_host", host); cJSON_AddNumberToObject(pTmp, "addr_port", port); } cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); cJSON_AddItemToObject(pRoot, "destId", pDestId); snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->privateTerm); cJSON_AddStringToObject(pRoot, "privateTerm", u64buf); snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term); cJSON_AddStringToObject(pRoot, "term", u64buf); cJSON_AddStringToObject(pRoot, "matchIndex", u64buf); snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->startTime); cJSON_AddStringToObject(pRoot, "startTime", u64buf); } cJSON* pJson = cJSON_CreateObject(); cJSON_AddItemToObject(pJson, "SyncHeartbeatReply", pRoot); return pJson; } char* syncHeartbeatReply2Str(const SyncHeartbeatReply* pMsg) { cJSON* pJson = syncHeartbeatReply2Json(pMsg); char* serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } void syncHeartbeatReplyPrint(const SyncHeartbeatReply* pMsg) { char* serialized = syncHeartbeatReply2Str(pMsg); printf("syncHeartbeatReplyPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized); fflush(NULL); taosMemoryFree(serialized); } void syncHeartbeatReplyPrint2(char* s, const SyncHeartbeatReply* pMsg) { char* serialized = syncHeartbeatReply2Str(pMsg); printf("syncHeartbeatReplyPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized); fflush(NULL); taosMemoryFree(serialized); } void syncHeartbeatReplyLog(const SyncHeartbeatReply* pMsg) { char* serialized = syncHeartbeatReply2Str(pMsg); sTrace("syncHeartbeatReplyLog | len:%d | %s", (int32_t)strlen(serialized), serialized); taosMemoryFree(serialized); } void syncHeartbeatReplyLog2(char* s, const SyncHeartbeatReply* pMsg) { if (gRaftDetailLog) { char* serialized = syncHeartbeatReply2Str(pMsg); sTrace("syncHeartbeatReplyLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); taosMemoryFree(serialized); } } // ---- message process SyncPreSnapshot---- SyncPreSnapshot* syncPreSnapshotBuild(int32_t vgId) { uint32_t bytes = sizeof(SyncPreSnapshot); SyncPreSnapshot* pMsg = taosMemoryMalloc(bytes); memset(pMsg, 0, bytes); pMsg->bytes = bytes; pMsg->vgId = vgId; pMsg->msgType = TDMT_SYNC_PRE_SNAPSHOT; return pMsg; } void syncPreSnapshotDestroy(SyncPreSnapshot* pMsg) { if (pMsg != NULL) { taosMemoryFree(pMsg); } } void syncPreSnapshotSerialize(const SyncPreSnapshot* pMsg, char* buf, uint32_t bufLen) { ASSERT(pMsg->bytes <= bufLen); memcpy(buf, pMsg, pMsg->bytes); } void syncPreSnapshotDeserialize(const char* buf, uint32_t len, SyncPreSnapshot* pMsg) { memcpy(pMsg, buf, len); ASSERT(len == pMsg->bytes); } char* syncPreSnapshotSerialize2(const SyncPreSnapshot* pMsg, uint32_t* len) { char* buf = taosMemoryMalloc(pMsg->bytes); ASSERT(buf != NULL); syncPreSnapshotSerialize(pMsg, buf, pMsg->bytes); if (len != NULL) { *len = pMsg->bytes; } return buf; } SyncPreSnapshot* syncPreSnapshotDeserialize2(const char* buf, uint32_t len) { uint32_t bytes = *((uint32_t*)buf); SyncPreSnapshot* pMsg = taosMemoryMalloc(bytes); ASSERT(pMsg != NULL); syncPreSnapshotDeserialize(buf, len, pMsg); ASSERT(len == pMsg->bytes); return pMsg; } void syncPreSnapshot2RpcMsg(const SyncPreSnapshot* pMsg, SRpcMsg* pRpcMsg) { memset(pRpcMsg, 0, sizeof(*pRpcMsg)); pRpcMsg->msgType = pMsg->msgType; pRpcMsg->contLen = pMsg->bytes; pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); syncPreSnapshotSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); } void syncPreSnapshotFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPreSnapshot* pMsg) { syncPreSnapshotDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); } SyncPreSnapshot* syncPreSnapshotFromRpcMsg2(const SRpcMsg* pRpcMsg) { SyncPreSnapshot* pMsg = syncPreSnapshotDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); ASSERT(pMsg != NULL); return pMsg; } cJSON* syncPreSnapshot2Json(const SyncPreSnapshot* pMsg) { char u64buf[128] = {0}; cJSON* pRoot = cJSON_CreateObject(); if (pMsg != NULL) { cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); cJSON* pSrcId = cJSON_CreateObject(); snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr); cJSON_AddStringToObject(pSrcId, "addr", u64buf); { uint64_t u64 = pMsg->srcId.addr; cJSON* pTmp = pSrcId; char host[128] = {0}; uint16_t port; syncUtilU642Addr(u64, host, sizeof(host), &port); cJSON_AddStringToObject(pTmp, "addr_host", host); cJSON_AddNumberToObject(pTmp, "addr_port", port); } cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); cJSON_AddItemToObject(pRoot, "srcId", pSrcId); cJSON* pDestId = cJSON_CreateObject(); snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr); cJSON_AddStringToObject(pDestId, "addr", u64buf); { uint64_t u64 = pMsg->destId.addr; cJSON* pTmp = pDestId; char host[128] = {0}; uint16_t port; syncUtilU642Addr(u64, host, sizeof(host), &port); cJSON_AddStringToObject(pTmp, "addr_host", host); cJSON_AddNumberToObject(pTmp, "addr_port", port); } cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); cJSON_AddItemToObject(pRoot, "destId", pDestId); snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term); cJSON_AddStringToObject(pRoot, "term", u64buf); } cJSON* pJson = cJSON_CreateObject(); cJSON_AddItemToObject(pJson, "SyncPreSnapshot", pRoot); return pJson; } char* syncPreSnapshot2Str(const SyncPreSnapshot* pMsg) { cJSON* pJson = syncPreSnapshot2Json(pMsg); char* serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } void syncPreSnapshotPrint(const SyncPreSnapshot* pMsg) { char* serialized = syncPreSnapshot2Str(pMsg); printf("syncPreSnapshotPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized); fflush(NULL); taosMemoryFree(serialized); } void syncPreSnapshotPrint2(char* s, const SyncPreSnapshot* pMsg) { char* serialized = syncPreSnapshot2Str(pMsg); printf("syncPreSnapshotPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized); fflush(NULL); taosMemoryFree(serialized); } void syncPreSnapshotLog(const SyncPreSnapshot* pMsg) { char* serialized = syncPreSnapshot2Str(pMsg); sTrace("syncPreSnapshotLog | len:%d | %s", (int32_t)strlen(serialized), serialized); taosMemoryFree(serialized); } void syncPreSnapshotLog2(char* s, const SyncPreSnapshot* pMsg) { if (gRaftDetailLog) { char* serialized = syncPreSnapshot2Str(pMsg); sTrace("syncPreSnapshotLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); taosMemoryFree(serialized); } } // ---- message process SyncPreSnapshotReply---- SyncPreSnapshotReply* syncPreSnapshotReplyBuild(int32_t vgId) { uint32_t bytes = sizeof(SyncPreSnapshotReply); SyncPreSnapshotReply* pMsg = taosMemoryMalloc(bytes); memset(pMsg, 0, bytes); pMsg->bytes = bytes; pMsg->vgId = vgId; pMsg->msgType = TDMT_SYNC_PRE_SNAPSHOT_REPLY; return pMsg; } void syncPreSnapshotReplyDestroy(SyncPreSnapshotReply* pMsg) { if (pMsg != NULL) { taosMemoryFree(pMsg); } } void syncPreSnapshotReplySerialize(const SyncPreSnapshotReply* pMsg, char* buf, uint32_t bufLen) { ASSERT(pMsg->bytes <= bufLen); memcpy(buf, pMsg, pMsg->bytes); } void syncPreSnapshotReplyDeserialize(const char* buf, uint32_t len, SyncPreSnapshotReply* pMsg) { memcpy(pMsg, buf, len); ASSERT(len == pMsg->bytes); } char* syncPreSnapshotReplySerialize2(const SyncPreSnapshotReply* pMsg, uint32_t* len) { char* buf = taosMemoryMalloc(pMsg->bytes); ASSERT(buf != NULL); syncPreSnapshotReplySerialize(pMsg, buf, pMsg->bytes); if (len != NULL) { *len = pMsg->bytes; } return buf; } SyncPreSnapshotReply* syncPreSnapshotReplyDeserialize2(const char* buf, uint32_t len) { uint32_t bytes = *((uint32_t*)buf); SyncPreSnapshotReply* pMsg = taosMemoryMalloc(bytes); ASSERT(pMsg != NULL); syncPreSnapshotReplyDeserialize(buf, len, pMsg); ASSERT(len == pMsg->bytes); return pMsg; } void syncPreSnapshotReply2RpcMsg(const SyncPreSnapshotReply* pMsg, SRpcMsg* pRpcMsg) { memset(pRpcMsg, 0, sizeof(*pRpcMsg)); pRpcMsg->msgType = pMsg->msgType; pRpcMsg->contLen = pMsg->bytes; pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); syncPreSnapshotReplySerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); } void syncPreSnapshotReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPreSnapshotReply* pMsg) { syncPreSnapshotReplyDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); } SyncPreSnapshotReply* syncPreSnapshotReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) { SyncPreSnapshotReply* pMsg = syncPreSnapshotReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); ASSERT(pMsg != NULL); return pMsg; } cJSON* syncPreSnapshotReply2Json(const SyncPreSnapshotReply* pMsg) { char u64buf[128] = {0}; cJSON* pRoot = cJSON_CreateObject(); if (pMsg != NULL) { cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); cJSON* pSrcId = cJSON_CreateObject(); snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr); cJSON_AddStringToObject(pSrcId, "addr", u64buf); { uint64_t u64 = pMsg->srcId.addr; cJSON* pTmp = pSrcId; char host[128] = {0}; uint16_t port; syncUtilU642Addr(u64, host, sizeof(host), &port); cJSON_AddStringToObject(pTmp, "addr_host", host); cJSON_AddNumberToObject(pTmp, "addr_port", port); } cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); cJSON_AddItemToObject(pRoot, "srcId", pSrcId); cJSON* pDestId = cJSON_CreateObject(); snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr); cJSON_AddStringToObject(pDestId, "addr", u64buf); { uint64_t u64 = pMsg->destId.addr; cJSON* pTmp = pDestId; char host[128] = {0}; uint16_t port; syncUtilU642Addr(u64, host, sizeof(host), &port); cJSON_AddStringToObject(pTmp, "addr_host", host); cJSON_AddNumberToObject(pTmp, "addr_port", port); } cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); cJSON_AddItemToObject(pRoot, "destId", pDestId); snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term); cJSON_AddStringToObject(pRoot, "term", u64buf); snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->snapStart); cJSON_AddStringToObject(pRoot, "snap-start", u64buf); } cJSON* pJson = cJSON_CreateObject(); cJSON_AddItemToObject(pJson, "SyncPreSnapshotReply", pRoot); return pJson; } char* syncPreSnapshotReply2Str(const SyncPreSnapshotReply* pMsg) { cJSON* pJson = syncPreSnapshotReply2Json(pMsg); char* serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } void syncPreSnapshotReplyPrint(const SyncPreSnapshotReply* pMsg) { char* serialized = syncPreSnapshotReply2Str(pMsg); printf("syncPreSnapshotReplyPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized); fflush(NULL); taosMemoryFree(serialized); } void syncPreSnapshotReplyPrint2(char* s, const SyncPreSnapshotReply* pMsg) { char* serialized = syncPreSnapshotReply2Str(pMsg); printf("syncPreSnapshotReplyPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized); fflush(NULL); taosMemoryFree(serialized); } void syncPreSnapshotReplyLog(const SyncPreSnapshotReply* pMsg) { char* serialized = syncPreSnapshotReply2Str(pMsg); sTrace("syncPreSnapshotReplyLog | len:%d | %s", (int32_t)strlen(serialized), serialized); taosMemoryFree(serialized); } void syncPreSnapshotReplyLog2(char* s, const SyncPreSnapshotReply* pMsg) { if (gRaftDetailLog) { char* serialized = syncPreSnapshotReply2Str(pMsg); sTrace("syncPreSnapshotReplyLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); taosMemoryFree(serialized); } } // ---- message process SyncApplyMsg---- SyncApplyMsg* syncApplyMsgBuild(uint32_t dataLen) { uint32_t bytes = sizeof(SyncApplyMsg) + dataLen; SyncApplyMsg* pMsg = taosMemoryMalloc(bytes); memset(pMsg, 0, bytes); pMsg->bytes = bytes; pMsg->msgType = TDMT_SYNC_APPLY_MSG; pMsg->dataLen = dataLen; return pMsg; } SyncApplyMsg* syncApplyMsgBuild2(const SRpcMsg* pOriginalRpcMsg, int32_t vgId, SFsmCbMeta* pMeta) { SyncApplyMsg* pMsg = syncApplyMsgBuild(pOriginalRpcMsg->contLen); pMsg->vgId = vgId; pMsg->originalRpcType = pOriginalRpcMsg->msgType; pMsg->fsmMeta = *pMeta; memcpy(pMsg->data, pOriginalRpcMsg->pCont, pOriginalRpcMsg->contLen); return pMsg; } void syncApplyMsgDestroy(SyncApplyMsg* pMsg) { if (pMsg != NULL) { taosMemoryFree(pMsg); } } void syncApplyMsgSerialize(const SyncApplyMsg* pMsg, char* buf, uint32_t bufLen) { ASSERT(pMsg->bytes <= bufLen); memcpy(buf, pMsg, pMsg->bytes); } void syncApplyMsgDeserialize(const char* buf, uint32_t len, SyncApplyMsg* pMsg) { memcpy(pMsg, buf, len); ASSERT(len == pMsg->bytes); } char* syncApplyMsgSerialize2(const SyncApplyMsg* pMsg, uint32_t* len) { char* buf = taosMemoryMalloc(pMsg->bytes); ASSERT(buf != NULL); syncApplyMsgSerialize(pMsg, buf, pMsg->bytes); if (len != NULL) { *len = pMsg->bytes; } return buf; } SyncApplyMsg* syncApplyMsgDeserialize2(const char* buf, uint32_t len) { uint32_t bytes = *((uint32_t*)buf); SyncApplyMsg* pMsg = taosMemoryMalloc(bytes); ASSERT(pMsg != NULL); syncApplyMsgDeserialize(buf, len, pMsg); ASSERT(len == pMsg->bytes); return pMsg; } // SyncApplyMsg to SRpcMsg, put it into ApplyQ void syncApplyMsg2RpcMsg(const SyncApplyMsg* pMsg, SRpcMsg* pRpcMsg) { memset(pRpcMsg, 0, sizeof(*pRpcMsg)); pRpcMsg->msgType = pMsg->msgType; pRpcMsg->contLen = pMsg->bytes; pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); syncApplyMsgSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); } // get SRpcMsg from ApplyQ, to SyncApplyMsg void syncApplyMsgFromRpcMsg(const SRpcMsg* pRpcMsg, SyncApplyMsg* pMsg) { syncApplyMsgDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); } SyncApplyMsg* syncApplyMsgFromRpcMsg2(const SRpcMsg* pRpcMsg) { SyncApplyMsg* pMsg = syncApplyMsgDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); return pMsg; } // SyncApplyMsg to OriginalRpcMsg void syncApplyMsg2OriginalRpcMsg(const SyncApplyMsg* pMsg, SRpcMsg* pOriginalRpcMsg) { memset(pOriginalRpcMsg, 0, sizeof(*pOriginalRpcMsg)); pOriginalRpcMsg->msgType = pMsg->originalRpcType; pOriginalRpcMsg->contLen = pMsg->dataLen; pOriginalRpcMsg->pCont = rpcMallocCont(pOriginalRpcMsg->contLen); memcpy(pOriginalRpcMsg->pCont, pMsg->data, pOriginalRpcMsg->contLen); } cJSON* syncApplyMsg2Json(const SyncApplyMsg* pMsg) { char u64buf[128] = {0}; cJSON* pRoot = cJSON_CreateObject(); if (pMsg != NULL) { cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); cJSON_AddNumberToObject(pRoot, "originalRpcType", pMsg->originalRpcType); snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->fsmMeta.index); cJSON_AddStringToObject(pRoot, "fsmMeta.index", u64buf); cJSON_AddNumberToObject(pRoot, "fsmMeta.isWeak", pMsg->fsmMeta.isWeak); cJSON_AddNumberToObject(pRoot, "fsmMeta.code", pMsg->fsmMeta.code); cJSON_AddNumberToObject(pRoot, "fsmMeta.state", pMsg->fsmMeta.state); cJSON_AddStringToObject(pRoot, "fsmMeta.state.str", syncStr(pMsg->fsmMeta.state)); snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->fsmMeta.seqNum); cJSON_AddStringToObject(pRoot, "fsmMeta.seqNum", u64buf); cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); char* s; s = syncUtilPrintBin((char*)(pMsg->data), pMsg->dataLen); cJSON_AddStringToObject(pRoot, "data", s); taosMemoryFree(s); s = syncUtilPrintBin2((char*)(pMsg->data), pMsg->dataLen); cJSON_AddStringToObject(pRoot, "data2", s); taosMemoryFree(s); } cJSON* pJson = cJSON_CreateObject(); cJSON_AddItemToObject(pJson, "SyncApplyMsg", pRoot); return pJson; } char* syncApplyMsg2Str(const SyncApplyMsg* pMsg) { cJSON* pJson = syncApplyMsg2Json(pMsg); char* serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } // for debug ---------------------- void syncApplyMsgPrint(const SyncApplyMsg* pMsg) { char* serialized = syncApplyMsg2Str(pMsg); printf("syncApplyMsgPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized); fflush(NULL); taosMemoryFree(serialized); } void syncApplyMsgPrint2(char* s, const SyncApplyMsg* pMsg) { char* serialized = syncApplyMsg2Str(pMsg); printf("syncApplyMsgPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized); fflush(NULL); taosMemoryFree(serialized); } void syncApplyMsgLog(const SyncApplyMsg* pMsg) { char* serialized = syncApplyMsg2Str(pMsg); sTrace("ssyncApplyMsgLog | len:%d | %s", (int32_t)strlen(serialized), serialized); taosMemoryFree(serialized); } void syncApplyMsgLog2(char* s, const SyncApplyMsg* pMsg) { if (gRaftDetailLog) { char* serialized = syncApplyMsg2Str(pMsg); sTrace("syncApplyMsgLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); taosMemoryFree(serialized); } } // --------------------------------------------- SyncSnapshotSend* syncSnapshotSendBuild(uint32_t dataLen, int32_t vgId) { uint32_t bytes = sizeof(SyncSnapshotSend) + dataLen; SyncSnapshotSend* pMsg = taosMemoryMalloc(bytes); memset(pMsg, 0, bytes); pMsg->bytes = bytes; pMsg->vgId = vgId; pMsg->msgType = TDMT_SYNC_SNAPSHOT_SEND; pMsg->dataLen = dataLen; return pMsg; } void syncSnapshotSendDestroy(SyncSnapshotSend* pMsg) { if (pMsg != NULL) { taosMemoryFree(pMsg); } } void syncSnapshotSendSerialize(const SyncSnapshotSend* pMsg, char* buf, uint32_t bufLen) { ASSERT(pMsg->bytes <= bufLen); memcpy(buf, pMsg, pMsg->bytes); } void syncSnapshotSendDeserialize(const char* buf, uint32_t len, SyncSnapshotSend* pMsg) { memcpy(pMsg, buf, len); ASSERT(len == pMsg->bytes); ASSERT(pMsg->bytes == sizeof(SyncSnapshotSend) + pMsg->dataLen); } char* syncSnapshotSendSerialize2(const SyncSnapshotSend* pMsg, uint32_t* len) { char* buf = taosMemoryMalloc(pMsg->bytes); ASSERT(buf != NULL); syncSnapshotSendSerialize(pMsg, buf, pMsg->bytes); if (len != NULL) { *len = pMsg->bytes; } return buf; } SyncSnapshotSend* syncSnapshotSendDeserialize2(const char* buf, uint32_t len) { uint32_t bytes = *((uint32_t*)buf); SyncSnapshotSend* pMsg = taosMemoryMalloc(bytes); ASSERT(pMsg != NULL); syncSnapshotSendDeserialize(buf, len, pMsg); ASSERT(len == pMsg->bytes); return pMsg; } void syncSnapshotSend2RpcMsg(const SyncSnapshotSend* pMsg, SRpcMsg* pRpcMsg) { memset(pRpcMsg, 0, sizeof(*pRpcMsg)); pRpcMsg->msgType = pMsg->msgType; pRpcMsg->contLen = pMsg->bytes; pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); syncSnapshotSendSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); } void syncSnapshotSendFromRpcMsg(const SRpcMsg* pRpcMsg, SyncSnapshotSend* pMsg) { syncSnapshotSendDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); } SyncSnapshotSend* syncSnapshotSendFromRpcMsg2(const SRpcMsg* pRpcMsg) { SyncSnapshotSend* pMsg = syncSnapshotSendDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); ASSERT(pMsg != NULL); return pMsg; } SyncSnapshotRsp* syncSnapshotRspBuild(int32_t vgId) { uint32_t bytes = sizeof(SyncSnapshotRsp); SyncSnapshotRsp* pMsg = taosMemoryMalloc(bytes); memset(pMsg, 0, bytes); pMsg->bytes = bytes; pMsg->vgId = vgId; pMsg->msgType = TDMT_SYNC_SNAPSHOT_RSP; return pMsg; } void syncSnapshotRspDestroy(SyncSnapshotRsp* pMsg) { if (pMsg != NULL) { taosMemoryFree(pMsg); } } void syncSnapshotRspSerialize(const SyncSnapshotRsp* pMsg, char* buf, uint32_t bufLen) { ASSERT(pMsg->bytes <= bufLen); memcpy(buf, pMsg, pMsg->bytes); } void syncSnapshotRspDeserialize(const char* buf, uint32_t len, SyncSnapshotRsp* pMsg) { memcpy(pMsg, buf, len); ASSERT(len == pMsg->bytes); } char* syncSnapshotRspSerialize2(const SyncSnapshotRsp* pMsg, uint32_t* len) { char* buf = taosMemoryMalloc(pMsg->bytes); ASSERT(buf != NULL); syncSnapshotRspSerialize(pMsg, buf, pMsg->bytes); if (len != NULL) { *len = pMsg->bytes; } return buf; } SyncSnapshotRsp* syncSnapshotRspDeserialize2(const char* buf, uint32_t len) { uint32_t bytes = *((uint32_t*)buf); SyncSnapshotRsp* pMsg = taosMemoryMalloc(bytes); ASSERT(pMsg != NULL); syncSnapshotRspDeserialize(buf, len, pMsg); ASSERT(len == pMsg->bytes); return pMsg; } void syncSnapshotRsp2RpcMsg(const SyncSnapshotRsp* pMsg, SRpcMsg* pRpcMsg) { memset(pRpcMsg, 0, sizeof(*pRpcMsg)); pRpcMsg->msgType = pMsg->msgType; pRpcMsg->contLen = pMsg->bytes; pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); syncSnapshotRspSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); } void syncSnapshotRspFromRpcMsg(const SRpcMsg* pRpcMsg, SyncSnapshotRsp* pMsg) { syncSnapshotRspDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); } SyncSnapshotRsp* syncSnapshotRspFromRpcMsg2(const SRpcMsg* pRpcMsg) { SyncSnapshotRsp* pMsg = syncSnapshotRspDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); ASSERT(pMsg != NULL); return pMsg; } cJSON* syncSnapshotRsp2Json(const SyncSnapshotRsp* pMsg) { char u64buf[128]; cJSON* pRoot = cJSON_CreateObject(); if (pMsg != NULL) { cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); cJSON* pSrcId = cJSON_CreateObject(); snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr); cJSON_AddStringToObject(pSrcId, "addr", u64buf); { uint64_t u64 = pMsg->srcId.addr; cJSON* pTmp = pSrcId; char host[128]; uint16_t port; syncUtilU642Addr(u64, host, sizeof(host), &port); cJSON_AddStringToObject(pTmp, "addr_host", host); cJSON_AddNumberToObject(pTmp, "addr_port", port); } cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); cJSON_AddItemToObject(pRoot, "srcId", pSrcId); cJSON* pDestId = cJSON_CreateObject(); snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr); cJSON_AddStringToObject(pDestId, "addr", u64buf); { uint64_t u64 = pMsg->destId.addr; cJSON* pTmp = pDestId; char host[128]; uint16_t port; syncUtilU642Addr(u64, host, sizeof(host), &port); cJSON_AddStringToObject(pTmp, "addr_host", host); cJSON_AddNumberToObject(pTmp, "addr_port", port); } cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); cJSON_AddItemToObject(pRoot, "destId", pDestId); snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term); cJSON_AddStringToObject(pRoot, "term", u64buf); snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->startTime); cJSON_AddStringToObject(pRoot, "startTime", u64buf); snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->lastIndex); cJSON_AddStringToObject(pRoot, "lastIndex", u64buf); snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->lastTerm); cJSON_AddStringToObject(pRoot, "lastTerm", u64buf); cJSON_AddNumberToObject(pRoot, "ack", pMsg->ack); cJSON_AddNumberToObject(pRoot, "code", pMsg->code); snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->snapBeginIndex); cJSON_AddStringToObject(pRoot, "snap-begin", u64buf); } cJSON* pJson = cJSON_CreateObject(); cJSON_AddItemToObject(pJson, "SyncSnapshotRsp", pRoot); return pJson; } char* syncSnapshotRsp2Str(const SyncSnapshotRsp* pMsg) { cJSON* pJson = syncSnapshotRsp2Json(pMsg); char* serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } // for debug ---------------------- void syncSnapshotRspPrint(const SyncSnapshotRsp* pMsg) { char* serialized = syncSnapshotRsp2Str(pMsg); printf("syncSnapshotRspPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized); fflush(NULL); taosMemoryFree(serialized); } void syncSnapshotRspPrint2(char* s, const SyncSnapshotRsp* pMsg) { char* serialized = syncSnapshotRsp2Str(pMsg); printf("syncSnapshotRspPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized); fflush(NULL); taosMemoryFree(serialized); } void syncSnapshotRspLog(const SyncSnapshotRsp* pMsg) { char* serialized = syncSnapshotRsp2Str(pMsg); sTrace("syncSnapshotRspLog | len:%d | %s", (int32_t)strlen(serialized), serialized); taosMemoryFree(serialized); } void syncSnapshotRspLog2(char* s, const SyncSnapshotRsp* pMsg) { if (gRaftDetailLog) { char* serialized = syncSnapshotRsp2Str(pMsg); sTrace("syncSnapshotRspLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); taosMemoryFree(serialized); } } // --------------------------------------------- SyncLeaderTransfer* syncLeaderTransferBuild(int32_t vgId) { uint32_t bytes = sizeof(SyncLeaderTransfer); SyncLeaderTransfer* pMsg = taosMemoryMalloc(bytes); memset(pMsg, 0, bytes); pMsg->bytes = bytes; pMsg->vgId = vgId; pMsg->msgType = TDMT_SYNC_LEADER_TRANSFER; return pMsg; } void syncLeaderTransferDestroy(SyncLeaderTransfer* pMsg) { if (pMsg != NULL) { taosMemoryFree(pMsg); } } void syncLeaderTransferSerialize(const SyncLeaderTransfer* pMsg, char* buf, uint32_t bufLen) { ASSERT(pMsg->bytes <= bufLen); memcpy(buf, pMsg, pMsg->bytes); } void syncLeaderTransferDeserialize(const char* buf, uint32_t len, SyncLeaderTransfer* pMsg) { memcpy(pMsg, buf, len); ASSERT(len == pMsg->bytes); } char* syncLeaderTransferSerialize2(const SyncLeaderTransfer* pMsg, uint32_t* len) { char* buf = taosMemoryMalloc(pMsg->bytes); ASSERT(buf != NULL); syncLeaderTransferSerialize(pMsg, buf, pMsg->bytes); if (len != NULL) { *len = pMsg->bytes; } return buf; } SyncLeaderTransfer* syncLeaderTransferDeserialize2(const char* buf, uint32_t len) { uint32_t bytes = *((uint32_t*)buf); SyncLeaderTransfer* pMsg = taosMemoryMalloc(bytes); ASSERT(pMsg != NULL); syncLeaderTransferDeserialize(buf, len, pMsg); ASSERT(len == pMsg->bytes); return pMsg; } void syncLeaderTransfer2RpcMsg(const SyncLeaderTransfer* pMsg, SRpcMsg* pRpcMsg) { memset(pRpcMsg, 0, sizeof(*pRpcMsg)); pRpcMsg->msgType = pMsg->msgType; pRpcMsg->contLen = pMsg->bytes; pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); syncLeaderTransferSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); } void syncLeaderTransferFromRpcMsg(const SRpcMsg* pRpcMsg, SyncLeaderTransfer* pMsg) { syncLeaderTransferDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); } SyncLeaderTransfer* syncLeaderTransferFromRpcMsg2(const SRpcMsg* pRpcMsg) { SyncLeaderTransfer* pMsg = syncLeaderTransferDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); ASSERT(pMsg != NULL); return pMsg; } cJSON* syncLeaderTransfer2Json(const SyncLeaderTransfer* pMsg) { char u64buf[128]; cJSON* pRoot = cJSON_CreateObject(); if (pMsg != NULL) { cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); /* cJSON* pSrcId = cJSON_CreateObject(); snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr); cJSON_AddStringToObject(pSrcId, "addr", u64buf); { uint64_t u64 = pMsg->srcId.addr; cJSON* pTmp = pSrcId; char host[128]; uint16_t port; syncUtilU642Addr(u64, host, sizeof(host), &port); cJSON_AddStringToObject(pTmp, "addr_host", host); cJSON_AddNumberToObject(pTmp, "addr_port", port); } cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); cJSON_AddItemToObject(pRoot, "srcId", pSrcId); cJSON* pDestId = cJSON_CreateObject(); snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr); cJSON_AddStringToObject(pDestId, "addr", u64buf); { uint64_t u64 = pMsg->destId.addr; cJSON* pTmp = pDestId; char host[128]; uint16_t port; syncUtilU642Addr(u64, host, sizeof(host), &port); cJSON_AddStringToObject(pTmp, "addr_host", host); cJSON_AddNumberToObject(pTmp, "addr_port", port); } cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); cJSON_AddItemToObject(pRoot, "destId", pDestId); */ cJSON* pNewerId = cJSON_CreateObject(); snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->newLeaderId.addr); cJSON_AddStringToObject(pNewerId, "addr", u64buf); { uint64_t u64 = pMsg->newLeaderId.addr; cJSON* pTmp = pNewerId; char host[128]; uint16_t port; syncUtilU642Addr(u64, host, sizeof(host), &port); cJSON_AddStringToObject(pTmp, "addr_host", host); cJSON_AddNumberToObject(pTmp, "addr_port", port); } cJSON_AddNumberToObject(pNewerId, "vgId", pMsg->newLeaderId.vgId); cJSON_AddItemToObject(pRoot, "newLeaderId", pNewerId); } cJSON* pJson = cJSON_CreateObject(); cJSON_AddItemToObject(pJson, "SyncLeaderTransfer", pRoot); return pJson; } char* syncLeaderTransfer2Str(const SyncLeaderTransfer* pMsg) { cJSON* pJson = syncLeaderTransfer2Json(pMsg); char* serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } const char* syncLocalCmdGetStr(int32_t cmd) { if (cmd == SYNC_LOCAL_CMD_STEP_DOWN) { return "step-down"; } else if (cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) { return "follower-commit"; } return "unknown-local-cmd"; } SyncLocalCmd* syncLocalCmdBuild(int32_t vgId) { uint32_t bytes = sizeof(SyncLocalCmd); SyncLocalCmd* pMsg = taosMemoryMalloc(bytes); memset(pMsg, 0, bytes); pMsg->bytes = bytes; pMsg->vgId = vgId; pMsg->msgType = TDMT_SYNC_LOCAL_CMD; return pMsg; } void syncLocalCmdDestroy(SyncLocalCmd* pMsg) { if (pMsg != NULL) { taosMemoryFree(pMsg); } } void syncLocalCmdSerialize(const SyncLocalCmd* pMsg, char* buf, uint32_t bufLen) { ASSERT(pMsg->bytes <= bufLen); memcpy(buf, pMsg, pMsg->bytes); } void syncLocalCmdDeserialize(const char* buf, uint32_t len, SyncLocalCmd* pMsg) { memcpy(pMsg, buf, len); ASSERT(len == pMsg->bytes); } char* syncLocalCmdSerialize2(const SyncLocalCmd* pMsg, uint32_t* len) { char* buf = taosMemoryMalloc(pMsg->bytes); ASSERT(buf != NULL); syncLocalCmdSerialize(pMsg, buf, pMsg->bytes); if (len != NULL) { *len = pMsg->bytes; } return buf; } SyncLocalCmd* syncLocalCmdDeserialize2(const char* buf, uint32_t len) { uint32_t bytes = *((uint32_t*)buf); SyncLocalCmd* pMsg = taosMemoryMalloc(bytes); ASSERT(pMsg != NULL); syncLocalCmdDeserialize(buf, len, pMsg); ASSERT(len == pMsg->bytes); return pMsg; } void syncLocalCmd2RpcMsg(const SyncLocalCmd* pMsg, SRpcMsg* pRpcMsg) { memset(pRpcMsg, 0, sizeof(*pRpcMsg)); pRpcMsg->msgType = pMsg->msgType; pRpcMsg->contLen = pMsg->bytes; pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); syncLocalCmdSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); } void syncLocalCmdFromRpcMsg(const SRpcMsg* pRpcMsg, SyncLocalCmd* pMsg) { syncLocalCmdDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); } SyncLocalCmd* syncLocalCmdFromRpcMsg2(const SRpcMsg* pRpcMsg) { SyncLocalCmd* pMsg = syncLocalCmdDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); ASSERT(pMsg != NULL); return pMsg; } cJSON* syncLocalCmd2Json(const SyncLocalCmd* pMsg) { char u64buf[128]; cJSON* pRoot = cJSON_CreateObject(); if (pMsg != NULL) { cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); cJSON* pSrcId = cJSON_CreateObject(); snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr); cJSON_AddStringToObject(pSrcId, "addr", u64buf); { uint64_t u64 = pMsg->srcId.addr; cJSON* pTmp = pSrcId; char host[128]; uint16_t port; syncUtilU642Addr(u64, host, sizeof(host), &port); cJSON_AddStringToObject(pTmp, "addr_host", host); cJSON_AddNumberToObject(pTmp, "addr_port", port); } cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); cJSON_AddItemToObject(pRoot, "srcId", pSrcId); cJSON* pDestId = cJSON_CreateObject(); snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr); cJSON_AddStringToObject(pDestId, "addr", u64buf); { uint64_t u64 = pMsg->destId.addr; cJSON* pTmp = pDestId; char host[128]; uint16_t port; syncUtilU642Addr(u64, host, sizeof(host), &port); cJSON_AddStringToObject(pTmp, "addr_host", host); cJSON_AddNumberToObject(pTmp, "addr_port", port); } cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); cJSON_AddItemToObject(pRoot, "destId", pDestId); cJSON_AddNumberToObject(pRoot, "cmd", pMsg->cmd); snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->sdNewTerm); cJSON_AddStringToObject(pRoot, "sd-new-term", u64buf); snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->fcIndex); cJSON_AddStringToObject(pRoot, "fc-index", u64buf); } cJSON* pJson = cJSON_CreateObject(); cJSON_AddItemToObject(pJson, "SyncLocalCmd2Json", pRoot); return pJson; } char* syncLocalCmd2Str(const SyncLocalCmd* pMsg) { cJSON* pJson = syncLocalCmd2Json(pMsg); char* serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } // for debug ---------------------- void syncLocalCmdPrint(const SyncLocalCmd* pMsg) { char* serialized = syncLocalCmd2Str(pMsg); printf("syncLocalCmdPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized); fflush(NULL); taosMemoryFree(serialized); } void syncLocalCmdPrint2(char* s, const SyncLocalCmd* pMsg) { char* serialized = syncLocalCmd2Str(pMsg); printf("syncLocalCmdPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized); fflush(NULL); taosMemoryFree(serialized); } void syncLocalCmdLog(const SyncLocalCmd* pMsg) { char* serialized = syncLocalCmd2Str(pMsg); sTrace("syncLocalCmdLog | len:%d | %s", (int32_t)strlen(serialized), serialized); taosMemoryFree(serialized); } void syncLocalCmdLog2(char* s, const SyncLocalCmd* pMsg) { if (gRaftDetailLog) { char* serialized = syncLocalCmd2Str(pMsg); sTrace("syncLocalCmdLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); taosMemoryFree(serialized); } }