diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index c83082e3e4172dec1e06d30e96be9d47411657f7..1e5aa1801082eb7c615f4988f41cbc68b01a157b 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -154,7 +154,7 @@ void syncCleanUp(); int64_t syncStart(const SSyncInfo* pSyncInfo); void syncStop(int64_t rid); int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg); -int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak); +int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak); ESyncState syncGetMyRole(int64_t rid); void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole); diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 3405f0f6cc14d810be052ed052f821e0fab2a27f..7f54dee426b193cc703240ea5a2f502fa2a91af5 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -123,12 +123,25 @@ SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId); typedef struct SyncClientRequest { uint32_t bytes; uint32_t msgType; - int64_t seqNum; + uint32_t originalRpcType; + uint64_t seqNum; bool isWeak; uint32_t dataLen; char data[]; } SyncClientRequest; +#define SYNC_CLIENT_REQUEST_FIX_LEN \ + (sizeof(uint32_t) + sizeof(uint32_t) + sizeof(uint32_t) + sizeof(uint64_t) + sizeof(bool) + sizeof(uint32_t)) + +SyncClientRequest* syncClientRequestBuild(uint32_t dataLen); +void syncClientRequestDestroy(SyncClientRequest* pMsg); +void syncClientRequestSerialize(const SyncClientRequest* pMsg, char* buf, uint32_t bufLen); +void syncClientRequestDeserialize(const char* buf, uint32_t len, SyncClientRequest* pMsg); +void syncClientRequest2RpcMsg(const SyncClientRequest* pMsg, SRpcMsg* pRpcMsg); +void syncClientRequestFromRpcMsg(const SRpcMsg* pRpcMsg, SyncClientRequest* pMsg); +cJSON* syncClientRequest2Json(const SyncClientRequest* pMsg); +SyncClientRequest* syncClientRequestBuild2(const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak); + // --------------------------------------------- typedef struct SyncClientRequestReply { uint32_t bytes; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 4d19444abd4b1c3c1b1af9037f1de30a69a90354..7e8754b6840791214c98c7487fd3b5f0bd2570c2 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -60,13 +60,32 @@ int64_t syncStart(const SSyncInfo* pSyncInfo) { return 0; } -void syncStop(int64_t rid) {} +void syncStop(int64_t rid) { + SSyncNode* pSyncNode = NULL; // get pointer from rid + syncNodeClose(pSyncNode); +} int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { return 0; } -int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak) { return 0; } +int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { + SSyncNode* pSyncNode = NULL; // get pointer from rid + if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { + SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, 0, isWeak); + SRpcMsg rpcMsg; + syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg); + pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); + syncClientRequestDestroy(pSyncMsg); + } else { + sTrace("syncForwardToPeer not leader, %s", syncUtilState2String(pSyncNode->state)); + return -1; // need define err code !! + } + return 0; +} -ESyncState syncGetMyRole(int64_t rid) { return TAOS_SYNC_STATE_LEADER; } +ESyncState syncGetMyRole(int64_t rid) { + SSyncNode* pSyncNode = NULL; // get pointer from rid + return pSyncNode->state; +} void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole) {} diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 6732cd39588d08a10a0b8a43b6e90c7166fb78f4..3af073541bf81cc7c28fc854b03851cbdde17f53 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -35,7 +35,8 @@ cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) { pRoot = syncPingReply2Json(pSyncMsg); } else if (pRpcMsg->msgType == SYNC_CLIENT_REQUEST) { - pRoot = syncRpcUnknownMsg2Json(); + SyncClientRequest* pSyncMsg = (SyncClientRequest*)pRpcMsg->pCont; + pRoot = syncClientRequest2Json(pSyncMsg); } else if (pRpcMsg->msgType == SYNC_CLIENT_REQUEST_REPLY) { pRoot = syncRpcUnknownMsg2Json(); @@ -148,6 +149,7 @@ SyncPing* syncPingBuild(uint32_t dataLen) { pMsg->bytes = bytes; pMsg->msgType = SYNC_PING; pMsg->dataLen = dataLen; + return pMsg; } void syncPingDestroy(SyncPing* pMsg) { @@ -246,6 +248,7 @@ SyncPingReply* syncPingReplyBuild(uint32_t dataLen) { pMsg->bytes = bytes; pMsg->msgType = SYNC_PING_REPLY; pMsg->dataLen = dataLen; + return pMsg; } void syncPingReplyDestroy(SyncPingReply* pMsg) { @@ -336,6 +339,73 @@ SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId) return pMsg; } +// ---- message process SyncClientRequest---- +SyncClientRequest* syncClientRequestBuild(uint32_t dataLen) { + uint32_t bytes = SYNC_CLIENT_REQUEST_FIX_LEN + dataLen; + SyncClientRequest* pMsg = malloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->msgType = SYNC_CLIENT_REQUEST; + pMsg->seqNum = 0; + pMsg->isWeak = false; + pMsg->dataLen = dataLen; + return pMsg; +} + +void syncClientRequestDestroy(SyncClientRequest* pMsg) { + if (pMsg != NULL) { + free(pMsg); + } +} + +void syncClientRequestSerialize(const SyncClientRequest* pMsg, char* buf, uint32_t bufLen) { + assert(pMsg->bytes <= bufLen); + memcpy(buf, pMsg, pMsg->bytes); +} + +void syncClientRequestDeserialize(const char* buf, uint32_t len, SyncClientRequest* pMsg) { + memcpy(pMsg, buf, len); + assert(len == pMsg->bytes); +} + +void syncClientRequest2RpcMsg(const SyncClientRequest* pMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pMsg->msgType; + pRpcMsg->contLen = pMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + syncClientRequestSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); +} + +void syncClientRequestFromRpcMsg(const SRpcMsg* pRpcMsg, SyncClientRequest* pMsg) { + syncClientRequestDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); +} + +cJSON* syncClientRequest2Json(const SyncClientRequest* pMsg) { + char u64buf[128]; + + cJSON* pRoot = cJSON_CreateObject(); + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + cJSON_AddNumberToObject(pRoot, "originalRpcType", pMsg->originalRpcType); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->seqNum); + cJSON_AddStringToObject(pRoot, "seqNum", u64buf); + cJSON_AddNumberToObject(pRoot, "isWeak", pMsg->isWeak); + cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncClientRequest", pRoot); + return pJson; +} + +SyncClientRequest* syncClientRequestBuild2(const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak) { + SyncClientRequest* pMsg = syncClientRequestBuild(pOriginalRpcMsg->contLen); + pMsg->originalRpcType = pOriginalRpcMsg->msgType; + pMsg->seqNum = seqNum; + pMsg->isWeak = isWeak; + memcpy(pMsg->data, pOriginalRpcMsg->pCont, pOriginalRpcMsg->contLen); + return pMsg; +} + // ---- message process SyncRequestVote---- SyncRequestVote* syncRequestVoteBuild() { uint32_t bytes = sizeof(SyncRequestVote); @@ -343,6 +413,7 @@ SyncRequestVote* syncRequestVoteBuild() { memset(pMsg, 0, bytes); pMsg->bytes = bytes; pMsg->msgType = SYNC_REQUEST_VOTE; + return pMsg; } void syncRequestVoteDestroy(SyncRequestVote* pMsg) { @@ -428,6 +499,7 @@ SyncRequestVoteReply* SyncRequestVoteReplyBuild() { memset(pMsg, 0, bytes); pMsg->bytes = bytes; pMsg->msgType = SYNC_REQUEST_VOTE_REPLY; + return pMsg; } void syncRequestVoteReplyDestroy(SyncRequestVoteReply* pMsg) { @@ -511,6 +583,7 @@ SyncAppendEntries* syncAppendEntriesBuild(uint32_t dataLen) { pMsg->bytes = bytes; pMsg->msgType = SYNC_APPEND_ENTRIES; pMsg->dataLen = dataLen; + return pMsg; } void syncAppendEntriesDestroy(SyncAppendEntries* pMsg) { @@ -603,6 +676,7 @@ SyncAppendEntriesReply* syncAppendEntriesReplyBuild() { memset(pMsg, 0, bytes); pMsg->bytes = bytes; pMsg->msgType = SYNC_APPEND_ENTRIES_REPLY; + return pMsg; } void syncAppendEntriesReplyDestroy(SyncAppendEntriesReply* pMsg) {