提交 d5f7f8c3 编写于 作者: S Shengliang Guan

refact: remove sync apply msg

上级 7bcda46c
......@@ -172,27 +172,6 @@ typedef struct SyncApplyMsg {
char data[]; // user RpcMsg.pCont
} SyncApplyMsg;
SyncApplyMsg* syncApplyMsgBuild(uint32_t dataLen);
SyncApplyMsg* syncApplyMsgBuild2(const SRpcMsg* pOriginalRpcMsg, int32_t vgId, SFsmCbMeta* pMeta);
void syncApplyMsgDestroy(SyncApplyMsg* pMsg);
void syncApplyMsgSerialize(const SyncApplyMsg* pMsg, char* buf, uint32_t bufLen);
void syncApplyMsgDeserialize(const char* buf, uint32_t len, SyncApplyMsg* pMsg);
char* syncApplyMsgSerialize2(const SyncApplyMsg* pMsg, uint32_t* len);
SyncApplyMsg* syncApplyMsgDeserialize2(const char* buf, uint32_t len);
void syncApplyMsg2RpcMsg(const SyncApplyMsg* pMsg, SRpcMsg* pRpcMsg); // SyncApplyMsg to SRpcMsg, put it into ApplyQ
void syncApplyMsgFromRpcMsg(const SRpcMsg* pRpcMsg, SyncApplyMsg* pMsg); // get SRpcMsg from ApplyQ, to SyncApplyMsg
SyncApplyMsg* syncApplyMsgFromRpcMsg2(const SRpcMsg* pRpcMsg);
void syncApplyMsg2OriginalRpcMsg(const SyncApplyMsg* pMsg, SRpcMsg* pOriginalRpcMsg); // SyncApplyMsg to OriginalRpcMsg
cJSON* syncApplyMsg2Json(const SyncApplyMsg* pMsg);
char* syncApplyMsg2Str(const SyncApplyMsg* pMsg);
// for debug ----------------------
void syncApplyMsgPrint(const SyncApplyMsg* pMsg);
void syncApplyMsgPrint2(char* s, const SyncApplyMsg* pMsg);
void syncApplyMsgLog(const SyncApplyMsg* pMsg);
void syncApplyMsgLog2(char* s, const SyncApplyMsg* pMsg);
// ---------------------------------------------
typedef struct SyncSnapshotSend {
uint32_t bytes;
int32_t vgId;
......@@ -352,7 +331,7 @@ ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
const char* syncTimerTypeStr(enum ESyncTimeoutType timerType);
int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType ttype, uint64_t logicClock, int32_t ms, SSyncNode* pNode);
int32_t syncBuildClientRequest(SRpcMsg* pMsg, const SRpcMsg* pOriginalRpc, uint64_t seq, bool isWeak, int32_t vgId);
int32_t syncBuildClientRequest(SRpcMsg* pMsg, const SRpcMsg* pOriginal, uint64_t seq, bool isWeak, int32_t vgId);
int32_t syncBuildClientRequestFromNoopEntry(SRpcMsg* pMsg, const SSyncRaftEntry* pEntry, int32_t vgId);
int32_t syncBuildRequestVote(SRpcMsg* pMsg, int32_t vgId);
int32_t syncBuildRequestVoteReply(SRpcMsg* pMsg, int32_t vgId);
......@@ -362,6 +341,7 @@ int32_t syncBuildHeartbeat(SRpcMsg* pMsg, int32_t vgId);
int32_t syncBuildHeartbeatReply(SRpcMsg* pMsg, int32_t vgId);
int32_t syncBuildPreSnapshot(SRpcMsg* pMsg, int32_t vgId);
int32_t syncBuildPreSnapshotReply(SRpcMsg* pMsg, int32_t vgId);
int32_t syncBuildApplyMsg(SRpcMsg* pMsg, const SRpcMsg* pOriginal, int32_t vgId, SFsmCbMeta* pMeta);
#ifdef __cplusplus
}
......
......@@ -223,159 +223,6 @@ int32_t syncBuildPreSnapshotReply(SRpcMsg* pMsg, int32_t vgId) {
return 0;
}
// ---- message process SyncApplyMsg----
SyncApplyMsg* syncApplyMsgBuild(uint32_t dataLen) {
uint32_t bytes = sizeof(SyncApplyMsg) + dataLen;
SyncApplyMsg* pMsg = taosMemoryMalloc(bytes);
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->msgType = TDMT_SYNC_APPLY_MSG;
pMsg->dataLen = dataLen;
return pMsg;
}
SyncApplyMsg* syncApplyMsgBuild2(const SRpcMsg* pOriginalRpcMsg, int32_t vgId, SFsmCbMeta* pMeta) {
SyncApplyMsg* pMsg = syncApplyMsgBuild(pOriginalRpcMsg->contLen);
pMsg->vgId = vgId;
pMsg->originalRpcType = pOriginalRpcMsg->msgType;
pMsg->fsmMeta = *pMeta;
memcpy(pMsg->data, pOriginalRpcMsg->pCont, pOriginalRpcMsg->contLen);
return pMsg;
}
void syncApplyMsgDestroy(SyncApplyMsg* pMsg) {
if (pMsg != NULL) {
taosMemoryFree(pMsg);
}
}
void syncApplyMsgSerialize(const SyncApplyMsg* pMsg, char* buf, uint32_t bufLen) {
ASSERT(pMsg->bytes <= bufLen);
memcpy(buf, pMsg, pMsg->bytes);
}
void syncApplyMsgDeserialize(const char* buf, uint32_t len, SyncApplyMsg* pMsg) {
memcpy(pMsg, buf, len);
ASSERT(len == pMsg->bytes);
}
char* syncApplyMsgSerialize2(const SyncApplyMsg* pMsg, uint32_t* len) {
char* buf = taosMemoryMalloc(pMsg->bytes);
ASSERT(buf != NULL);
syncApplyMsgSerialize(pMsg, buf, pMsg->bytes);
if (len != NULL) {
*len = pMsg->bytes;
}
return buf;
}
SyncApplyMsg* syncApplyMsgDeserialize2(const char* buf, uint32_t len) {
uint32_t bytes = *((uint32_t*)buf);
SyncApplyMsg* pMsg = taosMemoryMalloc(bytes);
ASSERT(pMsg != NULL);
syncApplyMsgDeserialize(buf, len, pMsg);
ASSERT(len == pMsg->bytes);
return pMsg;
}
// SyncApplyMsg to SRpcMsg, put it into ApplyQ
void syncApplyMsg2RpcMsg(const SyncApplyMsg* pMsg, SRpcMsg* pRpcMsg) {
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
pRpcMsg->msgType = pMsg->msgType;
pRpcMsg->contLen = pMsg->bytes;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
syncApplyMsgSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
}
// get SRpcMsg from ApplyQ, to SyncApplyMsg
void syncApplyMsgFromRpcMsg(const SRpcMsg* pRpcMsg, SyncApplyMsg* pMsg) {
syncApplyMsgDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
}
SyncApplyMsg* syncApplyMsgFromRpcMsg2(const SRpcMsg* pRpcMsg) {
SyncApplyMsg* pMsg = syncApplyMsgDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
return pMsg;
}
// SyncApplyMsg to OriginalRpcMsg
void syncApplyMsg2OriginalRpcMsg(const SyncApplyMsg* pMsg, SRpcMsg* pOriginalRpcMsg) {
memset(pOriginalRpcMsg, 0, sizeof(*pOriginalRpcMsg));
pOriginalRpcMsg->msgType = pMsg->originalRpcType;
pOriginalRpcMsg->contLen = pMsg->dataLen;
pOriginalRpcMsg->pCont = rpcMallocCont(pOriginalRpcMsg->contLen);
memcpy(pOriginalRpcMsg->pCont, pMsg->data, pOriginalRpcMsg->contLen);
}
cJSON* syncApplyMsg2Json(const SyncApplyMsg* pMsg) {
char u64buf[128] = {0};
cJSON* pRoot = cJSON_CreateObject();
if (pMsg != NULL) {
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
cJSON_AddNumberToObject(pRoot, "originalRpcType", pMsg->originalRpcType);
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->fsmMeta.index);
cJSON_AddStringToObject(pRoot, "fsmMeta.index", u64buf);
cJSON_AddNumberToObject(pRoot, "fsmMeta.isWeak", pMsg->fsmMeta.isWeak);
cJSON_AddNumberToObject(pRoot, "fsmMeta.code", pMsg->fsmMeta.code);
cJSON_AddNumberToObject(pRoot, "fsmMeta.state", pMsg->fsmMeta.state);
cJSON_AddStringToObject(pRoot, "fsmMeta.state.str", syncStr(pMsg->fsmMeta.state));
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->fsmMeta.seqNum);
cJSON_AddStringToObject(pRoot, "fsmMeta.seqNum", u64buf);
cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen);
char* s;
s = syncUtilPrintBin((char*)(pMsg->data), pMsg->dataLen);
cJSON_AddStringToObject(pRoot, "data", s);
taosMemoryFree(s);
s = syncUtilPrintBin2((char*)(pMsg->data), pMsg->dataLen);
cJSON_AddStringToObject(pRoot, "data2", s);
taosMemoryFree(s);
}
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SyncApplyMsg", pRoot);
return pJson;
}
char* syncApplyMsg2Str(const SyncApplyMsg* pMsg) {
cJSON* pJson = syncApplyMsg2Json(pMsg);
char* serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
// for debug ----------------------
void syncApplyMsgPrint(const SyncApplyMsg* pMsg) {
char* serialized = syncApplyMsg2Str(pMsg);
printf("syncApplyMsgPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncApplyMsgPrint2(char* s, const SyncApplyMsg* pMsg) {
char* serialized = syncApplyMsg2Str(pMsg);
printf("syncApplyMsgPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncApplyMsgLog(const SyncApplyMsg* pMsg) {
char* serialized = syncApplyMsg2Str(pMsg);
sTrace("ssyncApplyMsgLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
taosMemoryFree(serialized);
}
void syncApplyMsgLog2(char* s, const SyncApplyMsg* pMsg) {
if (gRaftDetailLog) {
char* serialized = syncApplyMsg2Str(pMsg);
sTrace("syncApplyMsgLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
taosMemoryFree(serialized);
}
}
// ---------------------------------------------
SyncSnapshotSend* syncSnapshotSendBuild(uint32_t dataLen, int32_t vgId) {
uint32_t bytes = sizeof(SyncSnapshotSend) + dataLen;
......
......@@ -389,6 +389,26 @@ void syncPreSnapshotReplyLog2(char* s, const SyncPreSnapshotReply* pMsg);
int32_t syncNodeOnPreSnapshot(SSyncNode* ths, SyncPreSnapshot* pMsg);
int32_t syncNodeOnPreSnapshotReply(SSyncNode* ths, SyncPreSnapshotReply* pMsg);
SyncApplyMsg* syncApplyMsgBuild(uint32_t dataLen);
SyncApplyMsg* syncApplyMsgBuild2(const SRpcMsg* pOriginalRpcMsg, int32_t vgId, SFsmCbMeta* pMeta);
void syncApplyMsgDestroy(SyncApplyMsg* pMsg);
void syncApplyMsgSerialize(const SyncApplyMsg* pMsg, char* buf, uint32_t bufLen);
void syncApplyMsgDeserialize(const char* buf, uint32_t len, SyncApplyMsg* pMsg);
char* syncApplyMsgSerialize2(const SyncApplyMsg* pMsg, uint32_t* len);
SyncApplyMsg* syncApplyMsgDeserialize2(const char* buf, uint32_t len);
void syncApplyMsg2RpcMsg(const SyncApplyMsg* pMsg, SRpcMsg* pRpcMsg); // SyncApplyMsg to SRpcMsg, put it into ApplyQ
void syncApplyMsgFromRpcMsg(const SRpcMsg* pRpcMsg, SyncApplyMsg* pMsg); // get SRpcMsg from ApplyQ, to SyncApplyMsg
SyncApplyMsg* syncApplyMsgFromRpcMsg2(const SRpcMsg* pRpcMsg);
void syncApplyMsg2OriginalRpcMsg(const SyncApplyMsg* pMsg, SRpcMsg* pOriginalRpcMsg); // SyncApplyMsg to OriginalRpcMsg
cJSON* syncApplyMsg2Json(const SyncApplyMsg* pMsg);
char* syncApplyMsg2Str(const SyncApplyMsg* pMsg);
// for debug ----------------------
void syncApplyMsgPrint(const SyncApplyMsg* pMsg);
void syncApplyMsgPrint2(char* s, const SyncApplyMsg* pMsg);
void syncApplyMsgLog(const SyncApplyMsg* pMsg);
void syncApplyMsgLog2(char* s, const SyncApplyMsg* pMsg);
#ifdef __cplusplus
}
#endif
......
......@@ -2239,3 +2239,156 @@ void syncPreSnapshotReplyLog2(char* s, const SyncPreSnapshotReply* pMsg) {
taosMemoryFree(serialized);
}
}
// ---- message process SyncApplyMsg----
SyncApplyMsg* syncApplyMsgBuild(uint32_t dataLen) {
uint32_t bytes = sizeof(SyncApplyMsg) + dataLen;
SyncApplyMsg* pMsg = taosMemoryMalloc(bytes);
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->msgType = TDMT_SYNC_APPLY_MSG;
pMsg->dataLen = dataLen;
return pMsg;
}
SyncApplyMsg* syncApplyMsgBuild2(const SRpcMsg* pOriginalRpcMsg, int32_t vgId, SFsmCbMeta* pMeta) {
SyncApplyMsg* pMsg = syncApplyMsgBuild(pOriginalRpcMsg->contLen);
pMsg->vgId = vgId;
pMsg->originalRpcType = pOriginalRpcMsg->msgType;
pMsg->fsmMeta = *pMeta;
memcpy(pMsg->data, pOriginalRpcMsg->pCont, pOriginalRpcMsg->contLen);
return pMsg;
}
void syncApplyMsgDestroy(SyncApplyMsg* pMsg) {
if (pMsg != NULL) {
taosMemoryFree(pMsg);
}
}
void syncApplyMsgSerialize(const SyncApplyMsg* pMsg, char* buf, uint32_t bufLen) {
ASSERT(pMsg->bytes <= bufLen);
memcpy(buf, pMsg, pMsg->bytes);
}
void syncApplyMsgDeserialize(const char* buf, uint32_t len, SyncApplyMsg* pMsg) {
memcpy(pMsg, buf, len);
ASSERT(len == pMsg->bytes);
}
char* syncApplyMsgSerialize2(const SyncApplyMsg* pMsg, uint32_t* len) {
char* buf = taosMemoryMalloc(pMsg->bytes);
ASSERT(buf != NULL);
syncApplyMsgSerialize(pMsg, buf, pMsg->bytes);
if (len != NULL) {
*len = pMsg->bytes;
}
return buf;
}
SyncApplyMsg* syncApplyMsgDeserialize2(const char* buf, uint32_t len) {
uint32_t bytes = *((uint32_t*)buf);
SyncApplyMsg* pMsg = taosMemoryMalloc(bytes);
ASSERT(pMsg != NULL);
syncApplyMsgDeserialize(buf, len, pMsg);
ASSERT(len == pMsg->bytes);
return pMsg;
}
// SyncApplyMsg to SRpcMsg, put it into ApplyQ
void syncApplyMsg2RpcMsg(const SyncApplyMsg* pMsg, SRpcMsg* pRpcMsg) {
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
pRpcMsg->msgType = pMsg->msgType;
pRpcMsg->contLen = pMsg->bytes;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
syncApplyMsgSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
}
// get SRpcMsg from ApplyQ, to SyncApplyMsg
void syncApplyMsgFromRpcMsg(const SRpcMsg* pRpcMsg, SyncApplyMsg* pMsg) {
syncApplyMsgDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
}
SyncApplyMsg* syncApplyMsgFromRpcMsg2(const SRpcMsg* pRpcMsg) {
SyncApplyMsg* pMsg = syncApplyMsgDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
return pMsg;
}
// SyncApplyMsg to OriginalRpcMsg
void syncApplyMsg2OriginalRpcMsg(const SyncApplyMsg* pMsg, SRpcMsg* pOriginalRpcMsg) {
memset(pOriginalRpcMsg, 0, sizeof(*pOriginalRpcMsg));
pOriginalRpcMsg->msgType = pMsg->originalRpcType;
pOriginalRpcMsg->contLen = pMsg->dataLen;
pOriginalRpcMsg->pCont = rpcMallocCont(pOriginalRpcMsg->contLen);
memcpy(pOriginalRpcMsg->pCont, pMsg->data, pOriginalRpcMsg->contLen);
}
cJSON* syncApplyMsg2Json(const SyncApplyMsg* pMsg) {
char u64buf[128] = {0};
cJSON* pRoot = cJSON_CreateObject();
if (pMsg != NULL) {
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
cJSON_AddNumberToObject(pRoot, "originalRpcType", pMsg->originalRpcType);
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->fsmMeta.index);
cJSON_AddStringToObject(pRoot, "fsmMeta.index", u64buf);
cJSON_AddNumberToObject(pRoot, "fsmMeta.isWeak", pMsg->fsmMeta.isWeak);
cJSON_AddNumberToObject(pRoot, "fsmMeta.code", pMsg->fsmMeta.code);
cJSON_AddNumberToObject(pRoot, "fsmMeta.state", pMsg->fsmMeta.state);
cJSON_AddStringToObject(pRoot, "fsmMeta.state.str", syncStr(pMsg->fsmMeta.state));
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->fsmMeta.seqNum);
cJSON_AddStringToObject(pRoot, "fsmMeta.seqNum", u64buf);
cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen);
char* s;
s = syncUtilPrintBin((char*)(pMsg->data), pMsg->dataLen);
cJSON_AddStringToObject(pRoot, "data", s);
taosMemoryFree(s);
s = syncUtilPrintBin2((char*)(pMsg->data), pMsg->dataLen);
cJSON_AddStringToObject(pRoot, "data2", s);
taosMemoryFree(s);
}
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SyncApplyMsg", pRoot);
return pJson;
}
char* syncApplyMsg2Str(const SyncApplyMsg* pMsg) {
cJSON* pJson = syncApplyMsg2Json(pMsg);
char* serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
// for debug ----------------------
void syncApplyMsgPrint(const SyncApplyMsg* pMsg) {
char* serialized = syncApplyMsg2Str(pMsg);
printf("syncApplyMsgPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncApplyMsgPrint2(char* s, const SyncApplyMsg* pMsg) {
char* serialized = syncApplyMsg2Str(pMsg);
printf("syncApplyMsgPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncApplyMsgLog(const SyncApplyMsg* pMsg) {
char* serialized = syncApplyMsg2Str(pMsg);
sTrace("ssyncApplyMsgLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
taosMemoryFree(serialized);
}
void syncApplyMsgLog2(char* s, const SyncApplyMsg* pMsg) {
if (gRaftDetailLog) {
char* serialized = syncApplyMsg2Str(pMsg);
sTrace("syncApplyMsgLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
taosMemoryFree(serialized);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册