提交 8deb3b83 编写于 作者: S Shengliang Guan

refact: adjust timeout msg build

上级 60fd3c4e
...@@ -54,20 +54,9 @@ typedef struct SyncClientRequest { ...@@ -54,20 +54,9 @@ typedef struct SyncClientRequest {
char data[]; // origin RpcMsg.pCont char data[]; // origin RpcMsg.pCont
} SyncClientRequest; } SyncClientRequest;
SyncClientRequest* syncClientRequestAlloc(uint32_t dataLen);
int32_t syncClientRequestBuildFromRpcMsg(SRpcMsg* pClientRequestRpcMsg, const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, int32_t syncClientRequestBuildFromRpcMsg(SRpcMsg* pClientRequestRpcMsg, const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum,
bool isWeak, int32_t vgId); bool isWeak, int32_t vgId);
int32_t syncClientRequestBuildFromNoopEntry(SRpcMsg* pClientRequestRpcMsg, const SSyncRaftEntry* pEntry, int32_t vgId); int32_t syncClientRequestBuildFromNoopEntry(SRpcMsg* pClientRequestRpcMsg, const SSyncRaftEntry* pEntry, int32_t vgId);
void syncClientRequest2RpcMsg(const SyncClientRequest* pMsg, SRpcMsg* pRpcMsg); // step 2
void syncClientRequestFromRpcMsg(const SRpcMsg* pRpcMsg, SyncClientRequest* pMsg);
cJSON* syncClientRequest2Json(const SyncClientRequest* pMsg);
char* syncClientRequest2Str(const SyncClientRequest* pMsg);
// for debug ----------------------
void syncClientRequestPrint(const SyncClientRequest* pMsg);
void syncClientRequestPrint2(char* s, const SyncClientRequest* pMsg);
void syncClientRequestLog(const SyncClientRequest* pMsg);
void syncClientRequestLog2(char* s, const SyncClientRequest* pMsg);
// --------------------------------------------- // ---------------------------------------------
typedef struct SRaftMeta { typedef struct SRaftMeta {
...@@ -609,9 +598,6 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg); ...@@ -609,9 +598,6 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg);
bool syncNodeSnapshotEnable(SSyncNode* pSyncNode); bool syncNodeSnapshotEnable(SSyncNode* pSyncNode);
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode); ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
// ---------------------------------------------
SyncTimeout* syncTimeoutBuildX();
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -34,7 +34,7 @@ extern "C" { ...@@ -34,7 +34,7 @@ extern "C" {
// /\ voterLog' = [voterLog EXCEPT ![i] = [j \in {} |-> <<>>]] // /\ voterLog' = [voterLog EXCEPT ![i] = [j \in {} |-> <<>>]]
// /\ UNCHANGED <<messages, leaderVars, logVars>> // /\ UNCHANGED <<messages, leaderVars, logVars>>
// //
int32_t syncNodeOnTimer(SSyncNode* ths, SRpcMsg* pMsg); int32_t syncNodeOnTimer(SSyncNode* ths, const SRpcMsg* pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -42,15 +42,6 @@ int32_t syncTimeoutBuild(SRpcMsg* pTimeoutRpcMsg, ESyncTimeoutType timeoutType, ...@@ -42,15 +42,6 @@ int32_t syncTimeoutBuild(SRpcMsg* pTimeoutRpcMsg, ESyncTimeoutType timeoutType,
return 0; return 0;
} }
SyncClientRequest* syncClientRequestAlloc(uint32_t dataLen) {
uint32_t bytes = sizeof(SyncClientRequest) + dataLen;
SyncClientRequest* pMsg = taosMemoryCalloc(1, bytes);
pMsg->bytes = bytes;
pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST;
pMsg->dataLen = dataLen;
return pMsg;
}
int32_t syncClientRequestBuildFromRpcMsg(SRpcMsg* pClientRequestRpcMsg, const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, int32_t syncClientRequestBuildFromRpcMsg(SRpcMsg* pClientRequestRpcMsg, const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum,
bool isWeak, int32_t vgId) { bool isWeak, int32_t vgId) {
int32_t bytes = sizeof(SyncClientRequest) + pOriginalRpcMsg->contLen; int32_t bytes = sizeof(SyncClientRequest) + pOriginalRpcMsg->contLen;
...@@ -97,41 +88,6 @@ int32_t syncClientRequestBuildFromNoopEntry(SRpcMsg* pClientRequestRpcMsg, const ...@@ -97,41 +88,6 @@ int32_t syncClientRequestBuildFromNoopEntry(SRpcMsg* pClientRequestRpcMsg, const
return 0; return 0;
} }
cJSON* syncClientRequest2Json(const SyncClientRequest* pMsg) {
char u64buf[128] = {0};
cJSON* pRoot = cJSON_CreateObject();
if (pMsg != NULL) {
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
cJSON_AddNumberToObject(pRoot, "originalRpcType", pMsg->originalRpcType);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->seqNum);
cJSON_AddStringToObject(pRoot, "seqNum", u64buf);
cJSON_AddNumberToObject(pRoot, "isWeak", pMsg->isWeak);
cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen);
char* s;
s = syncUtilPrintBin((char*)(pMsg->data), pMsg->dataLen);
cJSON_AddStringToObject(pRoot, "data", s);
taosMemoryFree(s);
s = syncUtilPrintBin2((char*)(pMsg->data), pMsg->dataLen);
cJSON_AddStringToObject(pRoot, "data2", s);
taosMemoryFree(s);
}
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SyncClientRequest", pRoot);
return pJson;
}
char* syncClientRequest2Str(const SyncClientRequest* pMsg) {
cJSON* pJson = syncClientRequest2Json(pMsg);
char* serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
// ---- message process SyncClientRequestBatch---- // ---- message process SyncClientRequestBatch----
// block1: // block1:
......
...@@ -85,7 +85,7 @@ static int32_t syncNodeTimerRoutine(SSyncNode* ths) { ...@@ -85,7 +85,7 @@ static int32_t syncNodeTimerRoutine(SSyncNode* ths) {
return 0; return 0;
} }
int32_t syncNodeOnTimer(SSyncNode* ths, SRpcMsg* pRpc) { int32_t syncNodeOnTimer(SSyncNode* ths, const SRpcMsg* pRpc) {
int32_t ret = 0; int32_t ret = 0;
SyncTimeout* pMsg = pRpc->pCont; SyncTimeout* pMsg = pRpc->pCont;
......
...@@ -231,6 +231,16 @@ void syncTimeoutPrint2(char* s, const SyncTimeout* pMsg); ...@@ -231,6 +231,16 @@ void syncTimeoutPrint2(char* s, const SyncTimeout* pMsg);
void syncTimeoutLog(const SyncTimeout* pMsg); void syncTimeoutLog(const SyncTimeout* pMsg);
void syncTimeoutLog2(char* s, const SyncTimeout* pMsg); void syncTimeoutLog2(char* s, const SyncTimeout* pMsg);
SyncClientRequest* syncClientRequestAlloc(uint32_t dataLen);
void syncClientRequest2RpcMsg(const SyncClientRequest* pMsg, SRpcMsg* pRpcMsg); // step 2
void syncClientRequestFromRpcMsg(const SRpcMsg* pRpcMsg, SyncClientRequest* pMsg);
cJSON* syncClientRequest2Json(const SyncClientRequest* pMsg);
char* syncClientRequest2Str(const SyncClientRequest* pMsg);
void syncClientRequestPrint(const SyncClientRequest* pMsg);
void syncClientRequestPrint2(char* s, const SyncClientRequest* pMsg);
void syncClientRequestLog(const SyncClientRequest* pMsg);
void syncClientRequestLog2(char* s, const SyncClientRequest* pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -938,6 +938,50 @@ void syncSnapshotSendLog2(char* s, const SyncSnapshotSend* pMsg) { ...@@ -938,6 +938,50 @@ void syncSnapshotSendLog2(char* s, const SyncSnapshotSend* pMsg) {
} }
} }
SyncClientRequest* syncClientRequestAlloc(uint32_t dataLen) {
uint32_t bytes = sizeof(SyncClientRequest) + dataLen;
SyncClientRequest* pMsg = taosMemoryCalloc(1, bytes);
pMsg->bytes = bytes;
pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST;
pMsg->dataLen = dataLen;
return pMsg;
}
cJSON* syncClientRequest2Json(const SyncClientRequest* pMsg) {
char u64buf[128] = {0};
cJSON* pRoot = cJSON_CreateObject();
if (pMsg != NULL) {
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
cJSON_AddNumberToObject(pRoot, "originalRpcType", pMsg->originalRpcType);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->seqNum);
cJSON_AddStringToObject(pRoot, "seqNum", u64buf);
cJSON_AddNumberToObject(pRoot, "isWeak", pMsg->isWeak);
cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen);
char* s;
s = syncUtilPrintBin((char*)(pMsg->data), pMsg->dataLen);
cJSON_AddStringToObject(pRoot, "data", s);
taosMemoryFree(s);
s = syncUtilPrintBin2((char*)(pMsg->data), pMsg->dataLen);
cJSON_AddStringToObject(pRoot, "data2", s);
taosMemoryFree(s);
}
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SyncClientRequest", pRoot);
return pJson;
}
char* syncClientRequest2Str(const SyncClientRequest* pMsg) {
cJSON* pJson = syncClientRequest2Json(pMsg);
char* serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
cJSON* syncClientRequestBatch2Json(const SyncClientRequestBatch* pMsg) { cJSON* syncClientRequestBatch2Json(const SyncClientRequestBatch* pMsg) {
char u64buf[128] = {0}; char u64buf[128] = {0};
cJSON* pRoot = cJSON_CreateObject(); cJSON* pRoot = cJSON_CreateObject();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册