提交 1336ed11 编写于 作者: S Shengliang Guan

refact: adjust sync ping message handler

上级 57f1b593
......@@ -34,27 +34,12 @@ typedef struct SyncPing {
char data[];
} SyncPing;
SyncPing* syncPingBuild(uint32_t dataLen);
SyncPing* syncPingBuild2(const SRaftId* srcId, const SRaftId* destId, int32_t vgId, const char* str);
SyncPing* syncPingBuild3(const SRaftId* srcId, const SRaftId* destId, int32_t vgId);
void syncPingDestroy(SyncPing* pMsg);
void syncPingSerialize(const SyncPing* pMsg, char* buf, uint32_t bufLen);
void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg);
char* syncPingSerialize2(const SyncPing* pMsg, uint32_t* len);
SyncPing* syncPingDeserialize2(const char* buf, uint32_t len);
int32_t syncPingSerialize3(const SyncPing* pMsg, char* buf, int32_t bufLen);
SyncPing* syncPingDeserialize3(void* buf, int32_t bufLen);
void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg);
void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg);
SyncPing* syncPingFromRpcMsg2(const SRpcMsg* pRpcMsg);
cJSON* syncPing2Json(const SyncPing* pMsg);
char* syncPing2Str(const SyncPing* pMsg);
// for debug ----------------------
void syncPingPrint(const SyncPing* pMsg);
void syncPingPrint2(char* s, const SyncPing* pMsg);
void syncPingLog(const SyncPing* pMsg);
void syncPingLog2(char* s, const SyncPing* pMsg);
// ---------------------------------------------
typedef struct SyncPingReply {
......
......@@ -151,31 +151,6 @@ void syncTimeoutLog2(char* s, const SyncTimeout* pMsg) {
}
}
// ---- message process SyncPing----
SyncPing* syncPingBuild(uint32_t dataLen) {
uint32_t bytes = sizeof(SyncPing) + dataLen;
SyncPing* pMsg = taosMemoryMalloc(bytes);
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->msgType = TDMT_SYNC_PING;
pMsg->dataLen = dataLen;
return pMsg;
}
SyncPing* syncPingBuild2(const SRaftId* srcId, const SRaftId* destId, int32_t vgId, const char* str) {
uint32_t dataLen = strlen(str) + 1;
SyncPing* pMsg = syncPingBuild(dataLen);
pMsg->vgId = vgId;
pMsg->srcId = *srcId;
pMsg->destId = *destId;
snprintf(pMsg->data, pMsg->dataLen, "%s", str);
return pMsg;
}
SyncPing* syncPingBuild3(const SRaftId* srcId, const SRaftId* destId, int32_t vgId) {
SyncPing* pMsg = syncPingBuild2(srcId, destId, vgId, "ping");
return pMsg;
}
void syncPingDestroy(SyncPing* pMsg) {
if (pMsg != NULL) {
......@@ -194,16 +169,6 @@ void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg) {
ASSERT(pMsg->bytes == sizeof(SyncPing) + pMsg->dataLen);
}
char* syncPingSerialize2(const SyncPing* pMsg, uint32_t* len) {
char* buf = taosMemoryMalloc(pMsg->bytes);
ASSERT(buf != NULL);
syncPingSerialize(pMsg, buf, pMsg->bytes);
if (len != NULL) {
*len = pMsg->bytes;
}
return buf;
}
SyncPing* syncPingDeserialize2(const char* buf, uint32_t len) {
uint32_t bytes = *((uint32_t*)buf);
SyncPing* pMsg = taosMemoryMalloc(bytes);
......@@ -213,117 +178,6 @@ SyncPing* syncPingDeserialize2(const char* buf, uint32_t len) {
return pMsg;
}
int32_t syncPingSerialize3(const SyncPing* pMsg, char* buf, int32_t bufLen) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) {
return -1;
}
if (tEncodeU32(&encoder, pMsg->bytes) < 0) {
return -1;
}
if (tEncodeI32(&encoder, pMsg->vgId) < 0) {
return -1;
}
if (tEncodeU32(&encoder, pMsg->msgType) < 0) {
return -1;
}
if (tEncodeU64(&encoder, pMsg->srcId.addr) < 0) {
return -1;
}
if (tEncodeI32(&encoder, pMsg->srcId.vgId) < 0) {
return -1;
}
if (tEncodeU64(&encoder, pMsg->destId.addr) < 0) {
return -1;
}
if (tEncodeI32(&encoder, pMsg->destId.vgId) < 0) {
return -1;
}
if (tEncodeU32(&encoder, pMsg->dataLen) < 0) {
return -1;
}
if (tEncodeBinary(&encoder, pMsg->data, pMsg->dataLen)) {
return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
SyncPing* syncPingDeserialize3(void* buf, int32_t bufLen) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) {
return NULL;
}
SyncPing* pMsg = NULL;
uint32_t bytes;
if (tDecodeU32(&decoder, &bytes) < 0) {
return NULL;
}
pMsg = taosMemoryMalloc(bytes);
ASSERT(pMsg != NULL);
pMsg->bytes = bytes;
if (tDecodeI32(&decoder, &pMsg->vgId) < 0) {
taosMemoryFree(pMsg);
return NULL;
}
if (tDecodeU32(&decoder, &pMsg->msgType) < 0) {
taosMemoryFree(pMsg);
return NULL;
}
if (tDecodeU64(&decoder, &pMsg->srcId.addr) < 0) {
taosMemoryFree(pMsg);
return NULL;
}
if (tDecodeI32(&decoder, &pMsg->srcId.vgId) < 0) {
taosMemoryFree(pMsg);
return NULL;
}
if (tDecodeU64(&decoder, &pMsg->destId.addr) < 0) {
taosMemoryFree(pMsg);
return NULL;
}
if (tDecodeI32(&decoder, &pMsg->destId.vgId) < 0) {
taosMemoryFree(pMsg);
return NULL;
}
if (tDecodeU32(&decoder, &pMsg->dataLen) < 0) {
taosMemoryFree(pMsg);
return NULL;
}
uint32_t len;
char* data = NULL;
if (tDecodeBinary(&decoder, (uint8_t**)(&data), &len) < 0) {
taosMemoryFree(pMsg);
return NULL;
}
ASSERT(len == pMsg->dataLen);
memcpy(pMsg->data, data, len);
tEndDecode(&decoder);
tDecoderClear(&decoder);
return pMsg;
}
void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg) {
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
pRpcMsg->msgType = pMsg->msgType;
pRpcMsg->contLen = pMsg->bytes;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
syncPingSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
}
void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg) {
syncPingDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
}
SyncPing* syncPingFromRpcMsg2(const SRpcMsg* pRpcMsg) {
SyncPing* pMsg = syncPingDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
......@@ -331,96 +185,6 @@ SyncPing* syncPingFromRpcMsg2(const SRpcMsg* pRpcMsg) {
return pMsg;
}
cJSON* syncPing2Json(const SyncPing* pMsg) {
char u64buf[128] = {0};
cJSON* pRoot = cJSON_CreateObject();
if (pMsg != NULL) {
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
cJSON* pSrcId = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr);
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
{
uint64_t u64 = pMsg->srcId.addr;
cJSON* pTmp = pSrcId;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
}
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
cJSON* pDestId = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr);
cJSON_AddStringToObject(pDestId, "addr", u64buf);
{
uint64_t u64 = pMsg->destId.addr;
cJSON* pTmp = pDestId;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
}
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
cJSON_AddItemToObject(pRoot, "destId", pDestId);
cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen);
char* s;
s = syncUtilPrintBin((char*)(pMsg->data), pMsg->dataLen);
cJSON_AddStringToObject(pRoot, "data", s);
taosMemoryFree(s);
s = syncUtilPrintBin2((char*)(pMsg->data), pMsg->dataLen);
cJSON_AddStringToObject(pRoot, "data2", s);
taosMemoryFree(s);
}
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SyncPing", pRoot);
return pJson;
}
char* syncPing2Str(const SyncPing* pMsg) {
cJSON* pJson = syncPing2Json(pMsg);
char* serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
// for debug ----------------------
void syncPingPrint(const SyncPing* pMsg) {
char* serialized = syncPing2Str(pMsg);
printf("syncPingPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncPingPrint2(char* s, const SyncPing* pMsg) {
char* serialized = syncPing2Str(pMsg);
printf("syncPingPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncPingLog(const SyncPing* pMsg) {
char* serialized = syncPing2Str(pMsg);
sTrace("syncPingLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
taosMemoryFree(serialized);
}
void syncPingLog2(char* s, const SyncPing* pMsg) {
if (gRaftDetailLog) {
char* serialized = syncPing2Str(pMsg);
sTrace("syncPingLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
taosMemoryFree(serialized);
}
}
// ---- message process SyncPingReply----
SyncPingReply* syncPingReplyBuild(uint32_t dataLen) {
uint32_t bytes = sizeof(SyncPingReply) + dataLen;
......
......@@ -125,6 +125,23 @@ void syncRpcMsgPrint2(char* s, SRpcMsg* pMsg);
void syncRpcMsgLog(SRpcMsg* pMsg);
void syncRpcMsgLog2(char* s, SRpcMsg* pMsg);
// origin syncMessage
SyncPing* syncPingBuild(uint32_t dataLen);
SyncPing* syncPingBuild2(const SRaftId* srcId, const SRaftId* destId, int32_t vgId, const char* str);
SyncPing* syncPingBuild3(const SRaftId* srcId, const SRaftId* destId, int32_t vgId);
char* syncPingSerialize2(const SyncPing* pMsg, uint32_t* len);
int32_t syncPingSerialize3(const SyncPing* pMsg, char* buf, int32_t bufLen);
SyncPing* syncPingDeserialize3(void* buf, int32_t bufLen);
void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg);
void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg);
cJSON* syncPing2Json(const SyncPing* pMsg);
char* syncPing2Str(const SyncPing* pMsg);
void syncPingPrint(const SyncPing* pMsg);
void syncPingPrint2(char* s, const SyncPing* pMsg);
void syncPingLog(const SyncPing* pMsg);
void syncPingLog2(char* s, const SyncPing* pMsg);
#ifdef __cplusplus
}
#endif
......
......@@ -16,6 +16,244 @@
#define _DEFAULT_SOURCE
#include "syncTest.h"
// ---- message process SyncPing----
SyncPing* syncPingBuild(uint32_t dataLen) {
uint32_t bytes = sizeof(SyncPing) + dataLen;
SyncPing* pMsg = taosMemoryMalloc(bytes);
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->msgType = TDMT_SYNC_PING;
pMsg->dataLen = dataLen;
return pMsg;
}
SyncPing* syncPingBuild2(const SRaftId* srcId, const SRaftId* destId, int32_t vgId, const char* str) {
uint32_t dataLen = strlen(str) + 1;
SyncPing* pMsg = syncPingBuild(dataLen);
pMsg->vgId = vgId;
pMsg->srcId = *srcId;
pMsg->destId = *destId;
snprintf(pMsg->data, pMsg->dataLen, "%s", str);
return pMsg;
}
SyncPing* syncPingBuild3(const SRaftId* srcId, const SRaftId* destId, int32_t vgId) {
SyncPing* pMsg = syncPingBuild2(srcId, destId, vgId, "ping");
return pMsg;
}
char* syncPingSerialize2(const SyncPing* pMsg, uint32_t* len) {
char* buf = taosMemoryMalloc(pMsg->bytes);
ASSERT(buf != NULL);
syncPingSerialize(pMsg, buf, pMsg->bytes);
if (len != NULL) {
*len = pMsg->bytes;
}
return buf;
}
void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg) {
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
pRpcMsg->msgType = pMsg->msgType;
pRpcMsg->contLen = pMsg->bytes;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
syncPingSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
}
void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg) {
syncPingDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
}
SyncPing* syncPingDeserialize3(void* buf, int32_t bufLen) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) {
return NULL;
}
SyncPing* pMsg = NULL;
uint32_t bytes;
if (tDecodeU32(&decoder, &bytes) < 0) {
return NULL;
}
pMsg = taosMemoryMalloc(bytes);
ASSERT(pMsg != NULL);
pMsg->bytes = bytes;
if (tDecodeI32(&decoder, &pMsg->vgId) < 0) {
taosMemoryFree(pMsg);
return NULL;
}
if (tDecodeU32(&decoder, &pMsg->msgType) < 0) {
taosMemoryFree(pMsg);
return NULL;
}
if (tDecodeU64(&decoder, &pMsg->srcId.addr) < 0) {
taosMemoryFree(pMsg);
return NULL;
}
if (tDecodeI32(&decoder, &pMsg->srcId.vgId) < 0) {
taosMemoryFree(pMsg);
return NULL;
}
if (tDecodeU64(&decoder, &pMsg->destId.addr) < 0) {
taosMemoryFree(pMsg);
return NULL;
}
if (tDecodeI32(&decoder, &pMsg->destId.vgId) < 0) {
taosMemoryFree(pMsg);
return NULL;
}
if (tDecodeU32(&decoder, &pMsg->dataLen) < 0) {
taosMemoryFree(pMsg);
return NULL;
}
uint32_t len;
char* data = NULL;
if (tDecodeBinary(&decoder, (uint8_t**)(&data), &len) < 0) {
taosMemoryFree(pMsg);
return NULL;
}
ASSERT(len == pMsg->dataLen);
memcpy(pMsg->data, data, len);
tEndDecode(&decoder);
tDecoderClear(&decoder);
return pMsg;
}
int32_t syncPingSerialize3(const SyncPing* pMsg, char* buf, int32_t bufLen) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) {
return -1;
}
if (tEncodeU32(&encoder, pMsg->bytes) < 0) {
return -1;
}
if (tEncodeI32(&encoder, pMsg->vgId) < 0) {
return -1;
}
if (tEncodeU32(&encoder, pMsg->msgType) < 0) {
return -1;
}
if (tEncodeU64(&encoder, pMsg->srcId.addr) < 0) {
return -1;
}
if (tEncodeI32(&encoder, pMsg->srcId.vgId) < 0) {
return -1;
}
if (tEncodeU64(&encoder, pMsg->destId.addr) < 0) {
return -1;
}
if (tEncodeI32(&encoder, pMsg->destId.vgId) < 0) {
return -1;
}
if (tEncodeU32(&encoder, pMsg->dataLen) < 0) {
return -1;
}
if (tEncodeBinary(&encoder, pMsg->data, pMsg->dataLen)) {
return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
cJSON* syncPing2Json(const SyncPing* pMsg) {
char u64buf[128] = {0};
cJSON* pRoot = cJSON_CreateObject();
if (pMsg != NULL) {
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
cJSON* pSrcId = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr);
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
{
uint64_t u64 = pMsg->srcId.addr;
cJSON* pTmp = pSrcId;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
}
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
cJSON* pDestId = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr);
cJSON_AddStringToObject(pDestId, "addr", u64buf);
{
uint64_t u64 = pMsg->destId.addr;
cJSON* pTmp = pDestId;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
}
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
cJSON_AddItemToObject(pRoot, "destId", pDestId);
cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen);
char* s;
s = syncUtilPrintBin((char*)(pMsg->data), pMsg->dataLen);
cJSON_AddStringToObject(pRoot, "data", s);
taosMemoryFree(s);
s = syncUtilPrintBin2((char*)(pMsg->data), pMsg->dataLen);
cJSON_AddStringToObject(pRoot, "data2", s);
taosMemoryFree(s);
}
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SyncPing", pRoot);
return pJson;
}
char* syncPing2Str(const SyncPing* pMsg) {
cJSON* pJson = syncPing2Json(pMsg);
char* serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
// for debug ----------------------
void syncPingPrint(const SyncPing* pMsg) {
char* serialized = syncPing2Str(pMsg);
printf("syncPingPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncPingPrint2(char* s, const SyncPing* pMsg) {
char* serialized = syncPing2Str(pMsg);
printf("syncPingPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncPingLog(const SyncPing* pMsg) {
char* serialized = syncPing2Str(pMsg);
sTrace("syncPingLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
taosMemoryFree(serialized);
}
void syncPingLog2(char* s, const SyncPing* pMsg) {
if (gRaftDetailLog) {
char* serialized = syncPing2Str(pMsg);
sTrace("syncPingLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
taosMemoryFree(serialized);
}
}
// ---------------------------------------------
cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) {
cJSON* pRoot;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册