提交 2da7eac6 编写于 作者: S Shengliang Guan

refact: adjust timeout msg build

上级 278cf9d3
...@@ -40,27 +40,9 @@ typedef struct SyncTimeout { ...@@ -40,27 +40,9 @@ typedef struct SyncTimeout {
void* data; // need optimized void* data; // need optimized
} SyncTimeout; } SyncTimeout;
SyncTimeout* syncTimeoutBuild(); int32_t syncTimeoutBuild(SRpcMsg* pTimeoutRpcMsg, ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS,
SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS, int32_t vgId, SSyncNode* pNode);
void* data);
void syncTimeoutDestroy(SyncTimeout* pMsg);
void syncTimeoutSerialize(const SyncTimeout* pMsg, char* buf, uint32_t bufLen);
void syncTimeoutDeserialize(const char* buf, uint32_t len, SyncTimeout* pMsg);
char* syncTimeoutSerialize2(const SyncTimeout* pMsg, uint32_t* len);
SyncTimeout* syncTimeoutDeserialize2(const char* buf, uint32_t len);
void syncTimeout2RpcMsg(const SyncTimeout* pMsg, SRpcMsg* pRpcMsg);
void syncTimeoutFromRpcMsg(const SRpcMsg* pRpcMsg, SyncTimeout* pMsg);
SyncTimeout* syncTimeoutFromRpcMsg2(const SRpcMsg* pRpcMsg);
cJSON* syncTimeout2Json(const SyncTimeout* pMsg);
char* syncTimeout2Str(const SyncTimeout* pMsg);
// for debug ----------------------
void syncTimeoutPrint(const SyncTimeout* pMsg);
void syncTimeoutPrint2(char* s, const SyncTimeout* pMsg);
void syncTimeoutLog(const SyncTimeout* pMsg);
void syncTimeoutLog2(char* s, const SyncTimeout* pMsg);
// ---------------------------------------------
typedef struct SyncClientRequest { typedef struct SyncClientRequest {
uint32_t bytes; uint32_t bytes;
int32_t vgId; int32_t vgId;
...@@ -579,7 +561,7 @@ typedef struct SyncLocalCmd { ...@@ -579,7 +561,7 @@ typedef struct SyncLocalCmd {
int32_t cmd; int32_t cmd;
SyncTerm sdNewTerm; // step down new term SyncTerm sdNewTerm; // step down new term
SyncIndex fcIndex;// follower commit index SyncIndex fcIndex; // follower commit index
} SyncLocalCmd; } SyncLocalCmd;
...@@ -628,6 +610,7 @@ bool syncNodeSnapshotEnable(SSyncNode* pSyncNode); ...@@ -628,6 +610,7 @@ bool syncNodeSnapshotEnable(SSyncNode* pSyncNode);
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode); ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
// --------------------------------------------- // ---------------------------------------------
SyncTimeout* syncTimeoutBuildX();
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -34,7 +34,7 @@ extern "C" { ...@@ -34,7 +34,7 @@ extern "C" {
// /\ voterLog' = [voterLog EXCEPT ![i] = [j \in {} |-> <<>>]] // /\ voterLog' = [voterLog EXCEPT ![i] = [j \in {} |-> <<>>]]
// /\ UNCHANGED <<messages, leaderVars, logVars>> // /\ UNCHANGED <<messages, leaderVars, logVars>>
// //
int32_t syncNodeOnTimer(SSyncNode* ths, SyncTimeout* pMsg); int32_t syncNodeOnTimer(SSyncNode* ths, SRpcMsg* pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -142,9 +142,7 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) { ...@@ -142,9 +142,7 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
code = syncNodeOnHeartbeatReply(pSyncNode, pSyncMsg); code = syncNodeOnHeartbeatReply(pSyncNode, pSyncMsg);
syncHeartbeatReplyDestroy(pSyncMsg); syncHeartbeatReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_TIMEOUT) { } else if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
SyncTimeout* pSyncMsg = syncTimeoutFromRpcMsg2(pMsg); code = syncNodeOnTimer(pSyncNode, pMsg);
code = syncNodeOnTimer(pSyncNode, pSyncMsg);
syncTimeoutDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL); code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
...@@ -1797,70 +1795,67 @@ int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex ...@@ -1797,70 +1795,67 @@ int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex
} }
static void syncNodeEqPingTimer(void* param, void* tmrId) { static void syncNodeEqPingTimer(void* param, void* tmrId) {
SSyncNode* pSyncNode = (SSyncNode*)param; if (!syncIsInit()) return;
if (atomic_load_64(&pSyncNode->pingTimerLogicClockUser) <= atomic_load_64(&pSyncNode->pingTimerLogicClock)) {
SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_PING, atomic_load_64(&pSyncNode->pingTimerLogicClock), SSyncNode* pNode = param;
pSyncNode->pingTimerMS, pSyncNode->vgId, pSyncNode); if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) {
SRpcMsg rpcMsg; SRpcMsg rpcMsg = {0};
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); int32_t code = syncTimeoutBuild(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
sNTrace(pSyncNode, "enqueue ping timer"); pNode->pingTimerMS, pNode);
if (pSyncNode->syncEqMsg != NULL) {
int32_t code = pSyncNode->syncEqMsg(pSyncNode->msgcb, &rpcMsg);
if (code != 0) { if (code != 0) {
sError("vgId:%d, sync enqueue ping msg error, code:%d", pSyncNode->vgId, code); sNError(pNode, "failed to build ping msg");
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
syncTimeoutDestroy(pSyncMsg);
return; return;
} }
} else {
sTrace("syncNodeEqPingTimer pSyncNode->syncEqMsg is NULL");
}
syncTimeoutDestroy(pSyncMsg);
if (syncIsInit()) { sNTrace(pNode, "enqueue ping msg");
taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, syncEnv()->pTimerManager, code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
&pSyncNode->pPingTimer); if (code != 0) {
} else { sNError(pNode, "failed to sync enqueue ping msg since %s", terrstr());
sError("sync env is stop, syncNodeEqPingTimer"); rpcFreeCont(rpcMsg.pCont);
return;
} }
taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, pNode, syncEnv()->pTimerManager, &pNode->pPingTimer);
} else { } else {
sTrace("==syncNodeEqPingTimer== pingTimerLogicClock:%" PRId64 ", pingTimerLogicClockUser:%" PRId64, sTrace("==syncNodeEqPingTimer== pingTimerLogicClock:%" PRId64 ", pingTimerLogicClockUser:%" PRId64,
pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser); pNode->pingTimerLogicClock, pNode->pingTimerLogicClockUser);
} }
} }
static void syncNodeEqElectTimer(void* param, void* tmrId) { static void syncNodeEqElectTimer(void* param, void* tmrId) {
SElectTimer* pElectTimer = (SElectTimer*)param; if (!syncIsInit()) return;
SSyncNode* pSyncNode = pElectTimer->pSyncNode;
SElectTimer* pElectTimer = param;
SSyncNode* pNode = pElectTimer->pSyncNode;
SRpcMsg rpcMsg = {0};
int32_t code = syncTimeoutBuild(&rpcMsg, SYNC_TIMEOUT_ELECTION, pElectTimer->logicClock, pNode->electTimerMS, pNode);
SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_ELECTION, pElectTimer->logicClock, pSyncNode->electTimerMS,
pSyncNode->vgId, pSyncNode);
SRpcMsg rpcMsg;
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
if (pSyncNode->syncEqMsg != NULL && pSyncNode->msgcb != NULL && pSyncNode->msgcb->putToQueueFp != NULL) {
int32_t code = pSyncNode->syncEqMsg(pSyncNode->msgcb, &rpcMsg);
if (code != 0) { if (code != 0) {
sError("vgId:%d, sync enqueue elect msg error, code:%d", pSyncNode->vgId, code); sNError(pNode, "failed to build elect msg");
rpcFreeCont(rpcMsg.pCont);
syncTimeoutDestroy(pSyncMsg);
taosMemoryFree(pElectTimer); taosMemoryFree(pElectTimer);
return; return;
} }
sNTrace(pSyncNode, "eq elect timer lc:%" PRId64, pSyncMsg->logicClock);
} else { SyncTimeout* pTimeout = rpcMsg.pCont;
sTrace("syncNodeEqElectTimer syncEqMsg is NULL"); sNTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock);
code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
if (code != 0) {
sNError(pNode, "failed to sync enqueue elect msg since %s", terrstr());
rpcFreeCont(rpcMsg.pCont);
taosMemoryFree(pElectTimer);
return;
} }
syncTimeoutDestroy(pSyncMsg);
taosMemoryFree(pElectTimer); taosMemoryFree(pElectTimer);
#if 0 #if 0
// reset timer ms // reset timer ms
if (syncIsInit() && pSyncNode->electBaseLine > 0) { if (syncIsInit() && pNode->electBaseLine > 0) {
pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine); pNode->electTimerMS = syncUtilElectRandomMS(pNode->electBaseLine, 2 * pNode->electBaseLine);
taosTmrReset(syncNodeEqElectTimer, pSyncNode->electTimerMS, pSyncNode, syncEnv()->pTimerManager, taosTmrReset(syncNodeEqElectTimer, pNode->electTimerMS, pNode, syncEnv()->pTimerManager, &pNode->pElectTimer);
&pSyncNode->pElectTimer);
} else { } else {
sError("sync env is stop, syncNodeEqElectTimer"); sError("sync env is stop, syncNodeEqElectTimer");
} }
...@@ -1868,41 +1863,34 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { ...@@ -1868,41 +1863,34 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
} }
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
SSyncNode* pSyncNode = (SSyncNode*)param; if (!syncIsInit()) return;
sNTrace(pSyncNode, "eq hb timer");
SSyncNode* pNode = param;
if (pNode->replicaNum > 1) {
if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) {
SRpcMsg rpcMsg = {0};
int32_t code = syncTimeoutBuild(&rpcMsg, SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pNode->heartbeatTimerLogicClock),
pNode->heartbeatTimerMS, pNode);
if (pSyncNode->replicaNum > 1) {
if (atomic_load_64(&pSyncNode->heartbeatTimerLogicClockUser) <=
atomic_load_64(&pSyncNode->heartbeatTimerLogicClock)) {
SyncTimeout* pSyncMsg =
syncTimeoutBuild2(SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pSyncNode->heartbeatTimerLogicClock),
pSyncNode->heartbeatTimerMS, pSyncNode->vgId, pSyncNode);
SRpcMsg rpcMsg;
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
sNTrace(pSyncNode, "enqueue heartbeat timer");
if (pSyncNode->syncEqMsg != NULL) {
int32_t code = pSyncNode->syncEqMsg(pSyncNode->msgcb, &rpcMsg);
if (code != 0) { if (code != 0) {
sError("vgId:%d, sync enqueue timer msg error, code:%d", pSyncNode->vgId, code); sNError(pNode, "failed to build heartbeat msg");
rpcFreeCont(rpcMsg.pCont);
syncTimeoutDestroy(pSyncMsg);
return; return;
} }
} else {
sError("vgId:%d, enqueue msg cb ptr (i.e. syncEqMsg) not set.", pSyncNode->vgId);
}
syncTimeoutDestroy(pSyncMsg);
if (syncIsInit()) { sNTrace(pNode, "enqueue heartbeat timer");
taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, syncEnv()->pTimerManager, code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
&pSyncNode->pHeartbeatTimer); if (code != 0) {
} else { sNError(pNode, "failed to enqueue heartbeat msg since %s", terrstr());
sError("sync env is stop, syncNodeEqHeartbeatTimer"); rpcFreeCont(rpcMsg.pCont);
return;
} }
taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, pNode, syncEnv()->pTimerManager,
&pNode->pHeartbeatTimer);
} else { } else {
sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64 sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64,
"", pNode->heartbeatTimerLogicClock, pNode->heartbeatTimerLogicClockUser);
pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
} }
} }
} }
......
...@@ -20,139 +20,28 @@ ...@@ -20,139 +20,28 @@
#include "syncUtil.h" #include "syncUtil.h"
#include "tcoding.h" #include "tcoding.h"
// ---- message process SyncTimeout---- int32_t syncTimeoutBuild(SRpcMsg* pTimeoutRpcMsg, ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS,
SyncTimeout* syncTimeoutBuild() { SSyncNode* pNode) {
uint32_t bytes = sizeof(SyncTimeout); int32_t bytes = sizeof(SyncTimeout);
SyncTimeout* pMsg = taosMemoryMalloc(bytes); pTimeoutRpcMsg->pCont = rpcMallocCont(bytes);
memset(pMsg, 0, bytes); pTimeoutRpcMsg->msgType = TDMT_SYNC_TIMEOUT;
pMsg->bytes = bytes; pTimeoutRpcMsg->contLen = bytes;
pMsg->msgType = TDMT_SYNC_TIMEOUT; if (pTimeoutRpcMsg->pCont == NULL) {
return pMsg; terrno = TSDB_CODE_OUT_OF_MEMORY;
} return -1;
SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS, int32_t vgId,
void* data) {
SyncTimeout* pMsg = syncTimeoutBuild();
pMsg->vgId = vgId;
pMsg->timeoutType = timeoutType;
pMsg->logicClock = logicClock;
pMsg->timerMS = timerMS;
pMsg->data = data;
return pMsg;
}
void syncTimeoutDestroy(SyncTimeout* pMsg) {
if (pMsg != NULL) {
taosMemoryFree(pMsg);
}
}
void syncTimeoutSerialize(const SyncTimeout* pMsg, char* buf, uint32_t bufLen) {
ASSERT(pMsg->bytes <= bufLen);
memcpy(buf, pMsg, pMsg->bytes);
}
void syncTimeoutDeserialize(const char* buf, uint32_t len, SyncTimeout* pMsg) {
memcpy(pMsg, buf, len);
ASSERT(len == pMsg->bytes);
}
char* syncTimeoutSerialize2(const SyncTimeout* pMsg, uint32_t* len) {
char* buf = taosMemoryMalloc(pMsg->bytes);
ASSERT(buf != NULL);
syncTimeoutSerialize(pMsg, buf, pMsg->bytes);
if (len != NULL) {
*len = pMsg->bytes;
}
return buf;
}
SyncTimeout* syncTimeoutDeserialize2(const char* buf, uint32_t len) {
uint32_t bytes = *((uint32_t*)buf);
SyncTimeout* pMsg = taosMemoryMalloc(bytes);
ASSERT(pMsg != NULL);
syncTimeoutDeserialize(buf, len, pMsg);
ASSERT(len == pMsg->bytes);
return pMsg;
}
void syncTimeout2RpcMsg(const SyncTimeout* pMsg, SRpcMsg* pRpcMsg) {
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
pRpcMsg->msgType = pMsg->msgType;
pRpcMsg->contLen = pMsg->bytes;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
syncTimeoutSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
}
void syncTimeoutFromRpcMsg(const SRpcMsg* pRpcMsg, SyncTimeout* pMsg) {
syncTimeoutDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
}
SyncTimeout* syncTimeoutFromRpcMsg2(const SRpcMsg* pRpcMsg) {
SyncTimeout* pMsg = syncTimeoutDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
ASSERT(pMsg != NULL);
return pMsg;
}
cJSON* syncTimeout2Json(const SyncTimeout* pMsg) {
char u64buf[128] = {0};
cJSON* pRoot = cJSON_CreateObject();
if (pMsg != NULL) {
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
cJSON_AddNumberToObject(pRoot, "timeoutType", pMsg->timeoutType);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->logicClock);
cJSON_AddStringToObject(pRoot, "logicClock", u64buf);
cJSON_AddNumberToObject(pRoot, "timerMS", pMsg->timerMS);
snprintf(u64buf, sizeof(u64buf), "%p", pMsg->data);
cJSON_AddStringToObject(pRoot, "data", u64buf);
} }
cJSON* pJson = cJSON_CreateObject(); SyncTimeout* pTimeout = pTimeoutRpcMsg->pCont;
cJSON_AddItemToObject(pJson, "SyncTimeout", pRoot); pTimeout->bytes = bytes;
return pJson; pTimeout->msgType = TDMT_SYNC_TIMEOUT;
} pTimeout->vgId = pNode->vgId;
pTimeout->timeoutType = timeoutType;
char* syncTimeout2Str(const SyncTimeout* pMsg) { pTimeout->logicClock = logicClock;
cJSON* pJson = syncTimeout2Json(pMsg); pTimeout->timerMS = timerMS;
char* serialized = cJSON_Print(pJson); pTimeout->data = pNode;
cJSON_Delete(pJson); return 0;
return serialized;
}
// for debug ----------------------
void syncTimeoutPrint(const SyncTimeout* pMsg) {
char* serialized = syncTimeout2Str(pMsg);
printf("syncTimeoutPrint | len:%zu | %s \n", strlen(serialized), serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncTimeoutPrint2(char* s, const SyncTimeout* pMsg) {
char* serialized = syncTimeout2Str(pMsg);
printf("syncTimeoutPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncTimeoutLog(const SyncTimeout* pMsg) {
char* serialized = syncTimeout2Str(pMsg);
sTrace("syncTimeoutLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
taosMemoryFree(serialized);
}
void syncTimeoutLog2(char* s, const SyncTimeout* pMsg) {
if (gRaftDetailLog) {
char* serialized = syncTimeout2Str(pMsg);
sTrace("syncTimeoutLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
taosMemoryFree(serialized);
}
} }
// ---- message process SyncClientRequest----
SyncClientRequest* syncClientRequestAlloc(uint32_t dataLen) { SyncClientRequest* syncClientRequestAlloc(uint32_t dataLen) {
uint32_t bytes = sizeof(SyncClientRequest) + dataLen; uint32_t bytes = sizeof(SyncClientRequest) + dataLen;
SyncClientRequest* pMsg = taosMemoryCalloc(1, bytes); SyncClientRequest* pMsg = taosMemoryCalloc(1, bytes);
...@@ -166,6 +55,8 @@ int32_t syncClientRequestBuildFromRpcMsg(SRpcMsg* pClientRequestRpcMsg, const SR ...@@ -166,6 +55,8 @@ int32_t syncClientRequestBuildFromRpcMsg(SRpcMsg* pClientRequestRpcMsg, const SR
bool isWeak, int32_t vgId) { bool isWeak, int32_t vgId) {
int32_t bytes = sizeof(SyncClientRequest) + pOriginalRpcMsg->contLen; int32_t bytes = sizeof(SyncClientRequest) + pOriginalRpcMsg->contLen;
pClientRequestRpcMsg->pCont = rpcMallocCont(bytes); pClientRequestRpcMsg->pCont = rpcMallocCont(bytes);
pClientRequestRpcMsg->msgType = TDMT_SYNC_CLIENT_REQUEST;
pClientRequestRpcMsg->contLen = bytes;
if (pClientRequestRpcMsg->pCont == NULL) { if (pClientRequestRpcMsg->pCont == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
...@@ -181,14 +72,15 @@ int32_t syncClientRequestBuildFromRpcMsg(SRpcMsg* pClientRequestRpcMsg, const SR ...@@ -181,14 +72,15 @@ int32_t syncClientRequestBuildFromRpcMsg(SRpcMsg* pClientRequestRpcMsg, const SR
pClientRequest->dataLen = pOriginalRpcMsg->contLen; pClientRequest->dataLen = pOriginalRpcMsg->contLen;
memcpy(pClientRequest->data, (char*)pOriginalRpcMsg->pCont, pOriginalRpcMsg->contLen); memcpy(pClientRequest->data, (char*)pOriginalRpcMsg->pCont, pOriginalRpcMsg->contLen);
pClientRequestRpcMsg->msgType = TDMT_SYNC_CLIENT_REQUEST;
pClientRequestRpcMsg->contLen = bytes;
return 0; return 0;
} }
int32_t syncClientRequestBuildFromNoopEntry(SRpcMsg* pClientRequestRpcMsg, const SSyncRaftEntry* pEntry, int32_t vgId) { int32_t syncClientRequestBuildFromNoopEntry(SRpcMsg* pClientRequestRpcMsg, const SSyncRaftEntry* pEntry, int32_t vgId) {
int32_t bytes = sizeof(SyncClientRequest) + pEntry->bytes; int32_t bytes = sizeof(SyncClientRequest) + pEntry->bytes;
pClientRequestRpcMsg->pCont = rpcMallocCont(bytes); pClientRequestRpcMsg->pCont = rpcMallocCont(bytes);
pClientRequestRpcMsg->msgType = TDMT_SYNC_CLIENT_REQUEST;
pClientRequestRpcMsg->contLen = bytes;
if (pClientRequestRpcMsg->pCont == NULL) { if (pClientRequestRpcMsg->pCont == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
...@@ -202,8 +94,6 @@ int32_t syncClientRequestBuildFromNoopEntry(SRpcMsg* pClientRequestRpcMsg, const ...@@ -202,8 +94,6 @@ int32_t syncClientRequestBuildFromNoopEntry(SRpcMsg* pClientRequestRpcMsg, const
pClientRequest->dataLen = pEntry->bytes; pClientRequest->dataLen = pEntry->bytes;
memcpy(pClientRequest->data, (char*)pEntry, pEntry->bytes); memcpy(pClientRequest->data, (char*)pEntry, pEntry->bytes);
pClientRequestRpcMsg->msgType = TDMT_SYNC_CLIENT_REQUEST;
pClientRequestRpcMsg->contLen = bytes;
return 0; return 0;
} }
......
...@@ -85,8 +85,10 @@ static int32_t syncNodeTimerRoutine(SSyncNode* ths) { ...@@ -85,8 +85,10 @@ static int32_t syncNodeTimerRoutine(SSyncNode* ths) {
return 0; return 0;
} }
int32_t syncNodeOnTimer(SSyncNode* ths, SyncTimeout* pMsg) { int32_t syncNodeOnTimer(SSyncNode* ths, SRpcMsg* pRpc) {
int32_t ret = 0; int32_t ret = 0;
SyncTimeout* pMsg = pRpc->pCont;
syncLogRecvTimer(ths, pMsg, ""); syncLogRecvTimer(ths, pMsg, "");
if (pMsg->timeoutType == SYNC_TIMEOUT_PING) { if (pMsg->timeoutType == SYNC_TIMEOUT_PING) {
......
...@@ -28,7 +28,7 @@ void test2() { ...@@ -28,7 +28,7 @@ void test2() {
uint32_t len = pMsg->bytes; uint32_t len = pMsg->bytes;
char *serialized = (char *)taosMemoryMalloc(len); char *serialized = (char *)taosMemoryMalloc(len);
syncTimeoutSerialize(pMsg, serialized, len); syncTimeoutSerialize(pMsg, serialized, len);
SyncTimeout *pMsg2 = syncTimeoutBuild(); SyncTimeout *pMsg2 = syncTimeoutBuildX();
syncTimeoutDeserialize(serialized, len, pMsg2); syncTimeoutDeserialize(serialized, len, pMsg2);
syncTimeoutLog2((char *)"test2: syncTimeoutSerialize -> syncTimeoutDeserialize ", pMsg2); syncTimeoutLog2((char *)"test2: syncTimeoutSerialize -> syncTimeoutDeserialize ", pMsg2);
......
...@@ -213,6 +213,24 @@ int32_t syncNodePingSelf(SSyncNode* pSyncNode); ...@@ -213,6 +213,24 @@ int32_t syncNodePingSelf(SSyncNode* pSyncNode);
int32_t syncNodePingPeers(SSyncNode* pSyncNode); int32_t syncNodePingPeers(SSyncNode* pSyncNode);
int32_t syncNodePingAll(SSyncNode* pSyncNode); int32_t syncNodePingAll(SSyncNode* pSyncNode);
SyncTimeout* syncTimeoutBuildX();
SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS, int32_t vgId,
void* data);
void syncTimeoutDestroy(SyncTimeout* pMsg);
void syncTimeoutSerialize(const SyncTimeout* pMsg, char* buf, uint32_t bufLen);
void syncTimeoutDeserialize(const char* buf, uint32_t len, SyncTimeout* pMsg);
char* syncTimeoutSerialize2(const SyncTimeout* pMsg, uint32_t* len);
SyncTimeout* syncTimeoutDeserialize2(const char* buf, uint32_t len);
void syncTimeout2RpcMsg(const SyncTimeout* pMsg, SRpcMsg* pRpcMsg);
void syncTimeoutFromRpcMsg(const SRpcMsg* pRpcMsg, SyncTimeout* pMsg);
SyncTimeout* syncTimeoutFromRpcMsg2(const SRpcMsg* pRpcMsg);
cJSON* syncTimeout2Json(const SyncTimeout* pMsg);
char* syncTimeout2Str(const SyncTimeout* pMsg);
void syncTimeoutPrint(const SyncTimeout* pMsg);
void syncTimeoutPrint2(char* s, const SyncTimeout* pMsg);
void syncTimeoutLog(const SyncTimeout* pMsg);
void syncTimeoutLog2(char* s, const SyncTimeout* pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -1046,3 +1046,134 @@ void syncClientRequestLog2(char* s, const SyncClientRequest* pMsg) { ...@@ -1046,3 +1046,134 @@ void syncClientRequestLog2(char* s, const SyncClientRequest* pMsg) {
taosMemoryFree(serialized); taosMemoryFree(serialized);
} }
} }
// ---- message process SyncTimeout----
SyncTimeout* syncTimeoutBuildX() {
uint32_t bytes = sizeof(SyncTimeout);
SyncTimeout* pMsg = taosMemoryMalloc(bytes);
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->msgType = TDMT_SYNC_TIMEOUT;
return pMsg;
}
SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS, int32_t vgId,
void* data) {
SyncTimeout* pMsg = syncTimeoutBuildX();
pMsg->vgId = vgId;
pMsg->timeoutType = timeoutType;
pMsg->logicClock = logicClock;
pMsg->timerMS = timerMS;
pMsg->data = data;
return pMsg;
}
char* syncTimeoutSerialize2(const SyncTimeout* pMsg, uint32_t* len) {
char* buf = taosMemoryMalloc(pMsg->bytes);
ASSERT(buf != NULL);
syncTimeoutSerialize(pMsg, buf, pMsg->bytes);
if (len != NULL) {
*len = pMsg->bytes;
}
return buf;
}
void syncTimeoutDestroy(SyncTimeout* pMsg) {
if (pMsg != NULL) {
taosMemoryFree(pMsg);
}
}
void syncTimeoutSerialize(const SyncTimeout* pMsg, char* buf, uint32_t bufLen) {
ASSERT(pMsg->bytes <= bufLen);
memcpy(buf, pMsg, pMsg->bytes);
}
void syncTimeoutDeserialize(const char* buf, uint32_t len, SyncTimeout* pMsg) {
memcpy(pMsg, buf, len);
ASSERT(len == pMsg->bytes);
}
SyncTimeout* syncTimeoutDeserialize2(const char* buf, uint32_t len) {
uint32_t bytes = *((uint32_t*)buf);
SyncTimeout* pMsg = taosMemoryMalloc(bytes);
ASSERT(pMsg != NULL);
syncTimeoutDeserialize(buf, len, pMsg);
ASSERT(len == pMsg->bytes);
return pMsg;
}
void syncTimeout2RpcMsg(const SyncTimeout* pMsg, SRpcMsg* pRpcMsg) {
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
pRpcMsg->msgType = pMsg->msgType;
pRpcMsg->contLen = pMsg->bytes;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
syncTimeoutSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
}
void syncTimeoutFromRpcMsg(const SRpcMsg* pRpcMsg, SyncTimeout* pMsg) {
syncTimeoutDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
}
SyncTimeout* syncTimeoutFromRpcMsg2(const SRpcMsg* pRpcMsg) {
SyncTimeout* pMsg = syncTimeoutDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
ASSERT(pMsg != NULL);
return pMsg;
}
cJSON* syncTimeout2Json(const SyncTimeout* pMsg) {
char u64buf[128] = {0};
cJSON* pRoot = cJSON_CreateObject();
if (pMsg != NULL) {
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
cJSON_AddNumberToObject(pRoot, "timeoutType", pMsg->timeoutType);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->logicClock);
cJSON_AddStringToObject(pRoot, "logicClock", u64buf);
cJSON_AddNumberToObject(pRoot, "timerMS", pMsg->timerMS);
snprintf(u64buf, sizeof(u64buf), "%p", pMsg->data);
cJSON_AddStringToObject(pRoot, "data", u64buf);
}
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SyncTimeout", pRoot);
return pJson;
}
char* syncTimeout2Str(const SyncTimeout* pMsg) {
cJSON* pJson = syncTimeout2Json(pMsg);
char* serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
// for debug ----------------------
void syncTimeoutPrint(const SyncTimeout* pMsg) {
char* serialized = syncTimeout2Str(pMsg);
printf("syncTimeoutPrint | len:%zu | %s \n", strlen(serialized), serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncTimeoutPrint2(char* s, const SyncTimeout* pMsg) {
char* serialized = syncTimeout2Str(pMsg);
printf("syncTimeoutPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncTimeoutLog(const SyncTimeout* pMsg) {
char* serialized = syncTimeout2Str(pMsg);
sTrace("syncTimeoutLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
taosMemoryFree(serialized);
}
void syncTimeoutLog2(char* s, const SyncTimeout* pMsg) {
if (gRaftDetailLog) {
char* serialized = syncTimeout2Str(pMsg);
sTrace("syncTimeoutLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
taosMemoryFree(serialized);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册