提交 d8741111 编写于 作者: M Minghao Li

sync refactor

上级 fa7f441f
......@@ -154,7 +154,7 @@ void syncCleanUp();
int64_t syncStart(const SSyncInfo* pSyncInfo);
void syncStop(int64_t rid);
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg);
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak);
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak);
ESyncState syncGetMyRole(int64_t rid);
void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole);
......
......@@ -123,12 +123,25 @@ SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId);
typedef struct SyncClientRequest {
uint32_t bytes;
uint32_t msgType;
int64_t seqNum;
uint32_t originalRpcType;
uint64_t seqNum;
bool isWeak;
uint32_t dataLen;
char data[];
} SyncClientRequest;
#define SYNC_CLIENT_REQUEST_FIX_LEN \
(sizeof(uint32_t) + sizeof(uint32_t) + sizeof(uint32_t) + sizeof(uint64_t) + sizeof(bool) + sizeof(uint32_t))
SyncClientRequest* syncClientRequestBuild(uint32_t dataLen);
void syncClientRequestDestroy(SyncClientRequest* pMsg);
void syncClientRequestSerialize(const SyncClientRequest* pMsg, char* buf, uint32_t bufLen);
void syncClientRequestDeserialize(const char* buf, uint32_t len, SyncClientRequest* pMsg);
void syncClientRequest2RpcMsg(const SyncClientRequest* pMsg, SRpcMsg* pRpcMsg);
void syncClientRequestFromRpcMsg(const SRpcMsg* pRpcMsg, SyncClientRequest* pMsg);
cJSON* syncClientRequest2Json(const SyncClientRequest* pMsg);
SyncClientRequest* syncClientRequestBuild2(const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak);
// ---------------------------------------------
typedef struct SyncClientRequestReply {
uint32_t bytes;
......
......@@ -60,13 +60,32 @@ int64_t syncStart(const SSyncInfo* pSyncInfo) {
return 0;
}
void syncStop(int64_t rid) {}
void syncStop(int64_t rid) {
SSyncNode* pSyncNode = NULL; // get pointer from rid
syncNodeClose(pSyncNode);
}
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { return 0; }
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak) { return 0; }
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
SSyncNode* pSyncNode = NULL; // get pointer from rid
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, 0, isWeak);
SRpcMsg rpcMsg;
syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg);
pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg);
syncClientRequestDestroy(pSyncMsg);
} else {
sTrace("syncForwardToPeer not leader, %s", syncUtilState2String(pSyncNode->state));
return -1; // need define err code !!
}
return 0;
}
ESyncState syncGetMyRole(int64_t rid) { return TAOS_SYNC_STATE_LEADER; }
ESyncState syncGetMyRole(int64_t rid) {
SSyncNode* pSyncNode = NULL; // get pointer from rid
return pSyncNode->state;
}
void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole) {}
......
......@@ -35,7 +35,8 @@ cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) {
pRoot = syncPingReply2Json(pSyncMsg);
} else if (pRpcMsg->msgType == SYNC_CLIENT_REQUEST) {
pRoot = syncRpcUnknownMsg2Json();
SyncClientRequest* pSyncMsg = (SyncClientRequest*)pRpcMsg->pCont;
pRoot = syncClientRequest2Json(pSyncMsg);
} else if (pRpcMsg->msgType == SYNC_CLIENT_REQUEST_REPLY) {
pRoot = syncRpcUnknownMsg2Json();
......@@ -148,6 +149,7 @@ SyncPing* syncPingBuild(uint32_t dataLen) {
pMsg->bytes = bytes;
pMsg->msgType = SYNC_PING;
pMsg->dataLen = dataLen;
return pMsg;
}
void syncPingDestroy(SyncPing* pMsg) {
......@@ -246,6 +248,7 @@ SyncPingReply* syncPingReplyBuild(uint32_t dataLen) {
pMsg->bytes = bytes;
pMsg->msgType = SYNC_PING_REPLY;
pMsg->dataLen = dataLen;
return pMsg;
}
void syncPingReplyDestroy(SyncPingReply* pMsg) {
......@@ -336,6 +339,73 @@ SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId)
return pMsg;
}
// ---- message process SyncClientRequest----
SyncClientRequest* syncClientRequestBuild(uint32_t dataLen) {
uint32_t bytes = SYNC_CLIENT_REQUEST_FIX_LEN + dataLen;
SyncClientRequest* pMsg = malloc(bytes);
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->msgType = SYNC_CLIENT_REQUEST;
pMsg->seqNum = 0;
pMsg->isWeak = false;
pMsg->dataLen = dataLen;
return pMsg;
}
void syncClientRequestDestroy(SyncClientRequest* pMsg) {
if (pMsg != NULL) {
free(pMsg);
}
}
void syncClientRequestSerialize(const SyncClientRequest* pMsg, char* buf, uint32_t bufLen) {
assert(pMsg->bytes <= bufLen);
memcpy(buf, pMsg, pMsg->bytes);
}
void syncClientRequestDeserialize(const char* buf, uint32_t len, SyncClientRequest* pMsg) {
memcpy(pMsg, buf, len);
assert(len == pMsg->bytes);
}
void syncClientRequest2RpcMsg(const SyncClientRequest* pMsg, SRpcMsg* pRpcMsg) {
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
pRpcMsg->msgType = pMsg->msgType;
pRpcMsg->contLen = pMsg->bytes;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
syncClientRequestSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
}
void syncClientRequestFromRpcMsg(const SRpcMsg* pRpcMsg, SyncClientRequest* pMsg) {
syncClientRequestDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
}
cJSON* syncClientRequest2Json(const SyncClientRequest* pMsg) {
char u64buf[128];
cJSON* pRoot = cJSON_CreateObject();
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
cJSON_AddNumberToObject(pRoot, "originalRpcType", pMsg->originalRpcType);
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->seqNum);
cJSON_AddStringToObject(pRoot, "seqNum", u64buf);
cJSON_AddNumberToObject(pRoot, "isWeak", pMsg->isWeak);
cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen);
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SyncClientRequest", pRoot);
return pJson;
}
SyncClientRequest* syncClientRequestBuild2(const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak) {
SyncClientRequest* pMsg = syncClientRequestBuild(pOriginalRpcMsg->contLen);
pMsg->originalRpcType = pOriginalRpcMsg->msgType;
pMsg->seqNum = seqNum;
pMsg->isWeak = isWeak;
memcpy(pMsg->data, pOriginalRpcMsg->pCont, pOriginalRpcMsg->contLen);
return pMsg;
}
// ---- message process SyncRequestVote----
SyncRequestVote* syncRequestVoteBuild() {
uint32_t bytes = sizeof(SyncRequestVote);
......@@ -343,6 +413,7 @@ SyncRequestVote* syncRequestVoteBuild() {
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->msgType = SYNC_REQUEST_VOTE;
return pMsg;
}
void syncRequestVoteDestroy(SyncRequestVote* pMsg) {
......@@ -428,6 +499,7 @@ SyncRequestVoteReply* SyncRequestVoteReplyBuild() {
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->msgType = SYNC_REQUEST_VOTE_REPLY;
return pMsg;
}
void syncRequestVoteReplyDestroy(SyncRequestVoteReply* pMsg) {
......@@ -511,6 +583,7 @@ SyncAppendEntries* syncAppendEntriesBuild(uint32_t dataLen) {
pMsg->bytes = bytes;
pMsg->msgType = SYNC_APPEND_ENTRIES;
pMsg->dataLen = dataLen;
return pMsg;
}
void syncAppendEntriesDestroy(SyncAppendEntries* pMsg) {
......@@ -603,6 +676,7 @@ SyncAppendEntriesReply* syncAppendEntriesReplyBuild() {
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->msgType = SYNC_APPEND_ENTRIES_REPLY;
return pMsg;
}
void syncAppendEntriesReplyDestroy(SyncAppendEntriesReply* pMsg) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册