diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index c6f4a4f5c048efcdfae15be08b975ecbd956007a..cd04783dbcc368e72fffe792c3eb18b06c9425fb 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -71,15 +71,15 @@ typedef struct SSyncFSM { // when value in pBuf finish a raft flow, FpCommitCb is called, code indicates the result // user can do something according to the code and isWeak. for example, write data into tsdb - void (*FpCommitCb)(struct SSyncFSM* pFsm, const SSyncBuffer* pBuf, SyncIndex index, bool isWeak, int32_t code); + void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pBuf, SyncIndex index, bool isWeak, int32_t code); // when value in pBuf has been written into local log store, FpPreCommitCb is called, code indicates the result // user can do something according to the code and isWeak. for example, write data into tsdb - void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SSyncBuffer* pBuf, SyncIndex index, bool isWeak, int32_t code); + void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pBuf, SyncIndex index, bool isWeak, int32_t code); // when log entry is updated by a new one, FpRollBackCb is called // user can do something to roll back. for example, delete data from tsdb, or just ignore it - void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SSyncBuffer* pBuf, SyncIndex index, bool isWeak, int32_t code); + void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pBuf, SyncIndex index, bool isWeak, int32_t code); // user should implement this function, use "data" to take snapshot into "snapshot" int32_t (*FpTakeSnapshot)(SSnapshot* snapshot); @@ -95,10 +95,10 @@ typedef struct SSyncLogStore { void* data; // append one log entry - int32_t (*appendEntry)(struct SSyncLogStore* pLogStore, SSyncBuffer* pBuf); + int32_t (*appendEntry)(struct SSyncLogStore* pLogStore, SRpcMsg* pBuf); // get one log entry, user need to free pBuf->data - int32_t (*getEntry)(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncBuffer* pBuf); + int32_t (*getEntry)(struct SSyncLogStore* pLogStore, SyncIndex index, SRpcMsg* pBuf); // update log store commit index with "index" int32_t (*updateCommitIndex)(struct SSyncLogStore* pLogStore, SyncIndex index); @@ -153,8 +153,8 @@ int64_t syncStart(const SSyncInfo* pSyncInfo); void syncStop(int64_t rid); int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg); -// int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak); -int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak); +int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak); +// int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak); ESyncState syncGetMyRole(int64_t rid); void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole); diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index b2922a867625e1b5999c7372a0da1acfd9931a25..82108acf7b95de38e1987e21e636674cc846c1a0 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -92,8 +92,8 @@ struct SSyncEnv; typedef struct SSyncEnv SSyncEnv; typedef struct SRaftId { - SyncNodeId addr; - SyncGroupId vgId; + SyncNodeId addr; // typedef uint64_t SyncNodeId; + SyncGroupId vgId; // typedef int32_t SyncGroupId; } SRaftId; typedef struct SSyncNode { @@ -133,17 +133,17 @@ typedef struct SSyncNode { uint64_t heartbeatTimerCounter; // callback - int32_t (*FpOnPing)(struct SSyncNode* ths, SyncPing* pMsg); + int32_t (*FpOnPing)(SSyncNode* ths, SyncPing* pMsg); - int32_t (*FpOnPingReply)(struct SSyncNode* ths, SyncPingReply* pMsg); + int32_t (*FpOnPingReply)(SSyncNode* ths, SyncPingReply* pMsg); - int32_t (*FpOnRequestVote)(struct SSyncNode* ths, SyncRequestVote* pMsg); + int32_t (*FpOnRequestVote)(SSyncNode* ths, SyncRequestVote* pMsg); - int32_t (*FpOnRequestVoteReply)(struct SSyncNode* ths, SyncRequestVoteReply* pMsg); + int32_t (*FpOnRequestVoteReply)(SSyncNode* ths, SyncRequestVoteReply* pMsg); - int32_t (*FpOnAppendEntries)(struct SSyncNode* ths, SyncAppendEntries* pMsg); + int32_t (*FpOnAppendEntries)(SSyncNode* ths, SyncAppendEntries* pMsg); - int32_t (*FpOnAppendEntriesReply)(struct SSyncNode* ths, SyncAppendEntriesReply* pMsg); + int32_t (*FpOnAppendEntriesReply)(SSyncNode* ths, SyncAppendEntriesReply* pMsg); // passed from outside void* rpcClient; diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 2ee5e0109c447f37a87eff474d2255cb33f1bde7..0de8e8cf4bb984bc3fe0b73edb95786c5ae3e182 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -27,6 +27,7 @@ extern "C" { #include "syncRaftEntry.h" #include "taosdef.h" +// encode as uint64 typedef enum ESyncMessageType { SYNC_PING = 0, SYNC_PING_REPLY, @@ -38,29 +39,47 @@ typedef enum ESyncMessageType { SYNC_APPEND_ENTRIES_REPLY, } ESyncMessageType; +/* +typedef struct SRaftId { + SyncNodeId addr; // typedef uint64_t SyncNodeId; + SyncGroupId vgId; // typedef int32_t SyncGroupId; +} SRaftId; +*/ + typedef struct SyncPing { - ESyncMessageType msgType; - const SSyncBuffer *pData; -} SyncPing, RaftPing; + uint32_t bytes; + uint32_t msgType; + SRaftId srcId; + SRaftId destId; + uint32_t dataLen; + char* data; +} SyncPing; + +#define SYNC_PING_FIX_LEN (sizeof(uint32_t) + sizeof(uint32_t) + sizeof(SRaftId) + sizeof(SRaftId) + sizeof(uint32_t)) typedef struct SyncPingReply { - ESyncMessageType msgType; - const SSyncBuffer *pData; -} SyncPingReply, RaftPingReply; + uint32_t bytes; + uint32_t msgType; + SRaftId srcId; + SRaftId destId; + uint32_t dataLen; + char* data; +} SyncPingReply; typedef struct SyncClientRequest { - ESyncMessageType msgType; - const SSyncBuffer *pData; - int64_t seqNum; - bool isWeak; -} SyncClientRequest, RaftClientRequest; + ESyncMessageType msgType; + char* data; + uint32_t dataLen; + int64_t seqNum; + bool isWeak; +} SyncClientRequest; typedef struct SyncClientRequestReply { - ESyncMessageType msgType; - int32_t errCode; - const SSyncBuffer *pErrMsg; - const SSyncBuffer *pLeaderHint; -} SyncClientRequestReply, RaftClientRequestReply; + ESyncMessageType msgType; + int32_t errCode; + SSyncBuffer* pErrMsg; + SSyncBuffer* pLeaderHint; +} SyncClientRequestReply; typedef struct SyncRequestVote { ESyncMessageType msgType; @@ -69,7 +88,7 @@ typedef struct SyncRequestVote { SyncGroupId vgId; SyncIndex lastLogIndex; SyncTerm lastLogTerm; -} SyncRequestVote, RaftRequestVote; +} SyncRequestVote; typedef struct SyncRequestVoteReply { ESyncMessageType msgType; @@ -77,7 +96,7 @@ typedef struct SyncRequestVoteReply { SyncNodeId nodeId; SyncGroupId vgId; bool voteGranted; -} SyncRequestVoteReply, RaftRequestVoteReply; +} SyncRequestVoteReply; typedef struct SyncAppendEntries { ESyncMessageType msgType; @@ -86,9 +105,9 @@ typedef struct SyncAppendEntries { SyncIndex prevLogIndex; SyncTerm prevLogTerm; int32_t entryCount; - SSyncRaftEntry * logEntries; + SSyncRaftEntry* logEntries; SyncIndex commitIndex; -} SyncAppendEntries, RaftAppendEntries; +} SyncAppendEntries; typedef struct SyncAppendEntriesReply { ESyncMessageType msgType; @@ -96,7 +115,18 @@ typedef struct SyncAppendEntriesReply { SyncNodeId nodeId; bool success; SyncIndex matchIndex; -} SyncAppendEntriesReply, RaftAppendEntriesReply; +} SyncAppendEntriesReply; + +// ---- message build ---- +SyncPing* syncPingBuild(uint32_t dataLen); + +void syncPingDestroy(SyncPing* pSyncPing); + +void syncPingSerialize(const SyncPing* pSyncPing, char* buf, uint32_t bufLen); + +void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pSyncPing); + +void syncPing2RpcMsg(const SyncPing* pSyncPing, SRpcMsg* pRpcMsg); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncUtil.h b/source/libs/sync/inc/syncUtil.h index f3d4c2ed07f28967922345138c752772ce7305b8..93d2c125254766ccfdd881a8afafd303b45f91fb 100644 --- a/source/libs/sync/inc/syncUtil.h +++ b/source/libs/sync/inc/syncUtil.h @@ -27,11 +27,28 @@ extern "C" { #include "syncMessage.h" #include "taosdef.h" -void nodeInfo2EpSet(const SNodeInfo* pNodeInfo, SEpSet* pEpSet); +// ---- encode / decode -void raftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet); +uint64_t syncUtilAddr2U64(const char* host, uint16_t port); -void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg); +void syncUtilU642Addr(uint64_t u64, char* host, size_t len, uint16_t* port); + +void syncUtilnodeInfo2EpSet(const SNodeInfo* pNodeInfo, SEpSet* pEpSet); + +void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet); + +void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId); + +// ---- SSyncBuffer ---- +#if 0 +void syncUtilbufBuild(SSyncBuffer* syncBuf, size_t len); + +void syncUtilbufDestroy(SSyncBuffer* syncBuf); + +void syncUtilbufCopy(const SSyncBuffer* src, SSyncBuffer* dest); + +void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest); +#endif #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 9b7d068ada0183493ed6fac2ddffffb72a34121a..4d9e5887cd79c7f215049c4f75ce3f4f7464b268 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -23,19 +23,20 @@ static int32_t tsNodeRefId = -1; // ------ local funciton --------- -static int32_t doSyncNodeSendMsgById(SRaftId* destRaftId, struct SSyncNode* pSyncNode, SRpcMsg* pMsg); -static int32_t doSyncNodeSendMsgByInfo(SNodeInfo* nodeInfo, struct SSyncNode* pSyncNode, SRpcMsg* pMsg); - -static int32_t doSyncNodePing(struct SSyncNode* ths, const SyncPing* pMsg); -static int32_t onSyncNodePing(struct SSyncNode* ths, SyncPing* pMsg); -static int32_t onSyncNodePingReply(struct SSyncNode* ths, SyncPingReply* pMsg); -static int32_t doSyncNodeRequestVote(struct SSyncNode* ths, const SyncRequestVote* pMsg); -static int32_t onSyncNodeRequestVote(struct SSyncNode* ths, SyncRequestVote* pMsg); -static int32_t onSyncNodeRequestVoteReply(struct SSyncNode* ths, SyncRequestVoteReply* pMsg); -static int32_t doSyncNodeAppendEntries(struct SSyncNode* ths, const SyncAppendEntries* pMsg); -static int32_t onSyncNodeAppendEntries(struct SSyncNode* ths, SyncAppendEntries* pMsg); -static int32_t onSyncNodeAppendEntriesReply(struct SSyncNode* ths, SyncAppendEntriesReply* pMsg); +static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg); +static int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg); static void syncNodePingTimerCb(void* param, void* tmrId); + +static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg); +static int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg); +static int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg); + +static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); +static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); +static int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg); +static int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg); +static int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg); +static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); // --------------------------------- int32_t syncInit() { @@ -55,7 +56,9 @@ void syncStop(int64_t rid) {} int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { return 0; } -int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak) { return 0; } +// int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak) { return 0; } + +int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak) { return 0; } ESyncState syncGetMyRole(int64_t rid) { return TAOS_SYNC_STATE_LEADER; } @@ -75,12 +78,12 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { pSyncNode->rpcClient = pSyncInfo->rpcClient; pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg; - pSyncNode->FpOnPing = onSyncNodePing; - pSyncNode->FpOnPingReply = onSyncNodePingReply; - pSyncNode->FpOnRequestVote = onSyncNodeRequestVote; - pSyncNode->FpOnRequestVoteReply = onSyncNodeRequestVoteReply; - pSyncNode->FpOnAppendEntries = onSyncNodeAppendEntries; - pSyncNode->FpOnAppendEntriesReply = onSyncNodeAppendEntriesReply; + pSyncNode->FpOnPing = syncNodeOnPingCb; + pSyncNode->FpOnPingReply = syncNodeOnPingReplyCb; + pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteCb; + pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplyCb; + pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntriesCb; + pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReplyCb; return pSyncNode; } @@ -92,13 +95,35 @@ void syncNodeClose(SSyncNode* pSyncNode) { void syncNodePingAll(SSyncNode* pSyncNode) { sTrace("syncNodePingAll %p ", pSyncNode); - SyncPing msg; - doSyncNodePing(pSyncNode, &msg); + int32_t ret = 0; + for (int i = 0; i < pSyncNode->syncCfg.replicaNum; ++i) { + SyncPing* pSyncPing; + SRaftId raftId; + syncUtilnodeInfo2raftId(&pSyncNode->syncCfg.nodeInfo[i], pSyncNode->vgId, &raftId); + ret = syncNodePing(pSyncNode, &raftId, pSyncPing); + assert(ret == 0); + } } -void syncNodePingPeers(SSyncNode* pSyncNode) {} +void syncNodePingPeers(SSyncNode* pSyncNode) { + int32_t ret = 0; + for (int i = 0; i < pSyncNode->peersNum; ++i) { + SyncPing* pSyncPing; + SRaftId raftId; + syncUtilnodeInfo2raftId(&pSyncNode->peers[i], pSyncNode->vgId, &raftId); + ret = syncNodePing(pSyncNode, &raftId, pSyncPing); + assert(ret == 0); + } +} -void syncNodePingSelf(SSyncNode* pSyncNode) {} +void syncNodePingSelf(SSyncNode* pSyncNode) { + int32_t ret = 0; + SyncPing* pSyncPing; + SRaftId raftId; + syncUtilnodeInfo2raftId(&pSyncNode->me, pSyncNode->vgId, &raftId); + ret = syncNodePing(pSyncNode, &raftId, pSyncPing); + assert(ret == 0); +} int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { if (pSyncNode->pPingTimer == NULL) { @@ -120,69 +145,64 @@ int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) { } // ------ local funciton --------- - -static int32_t doSyncNodeSendMsgById(SRaftId* destRaftId, struct SSyncNode* pSyncNode, SRpcMsg* pMsg) { - SEpSet epSet; - raftId2EpSet(destRaftId, &epSet); - pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg); - return 0; -} - -static int32_t doSyncNodeSendMsgByInfo(SNodeInfo* nodeInfo, struct SSyncNode* pSyncNode, SRpcMsg* pMsg) { - SEpSet epSet; - nodeInfo2EpSet(nodeInfo, &epSet); - - pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg); - return 0; -} - -static int32_t doSyncNodePing(struct SSyncNode* ths, const SyncPing* pMsg) { - int32_t ret; - for (int i = 0; i < ths->syncCfg.replicaNum; ++i) { - SRpcMsg* rpcMsg; - syncPing2RpcMsg(pMsg, rpcMsg); - doSyncNodeSendMsgByInfo(&ths->syncCfg.nodeInfo[i], ths, rpcMsg); - } - +static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) { + int32_t ret = 0; + SRpcMsg* rpcMsg; + syncPing2RpcMsg(pMsg, rpcMsg); + syncNodeSendMsgById(destRaftId, pSyncNode, rpcMsg); return ret; } -static int32_t onSyncNodePing(struct SSyncNode* ths, SyncPing* pMsg) { +static int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg) { int32_t ret = 0; return ret; } -static int32_t onSyncNodePingReply(struct SSyncNode* ths, SyncPingReply* pMsg) { +static int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg) { int32_t ret = 0; return ret; } -static int32_t doSyncNodeRequestVote(struct SSyncNode* ths, const SyncRequestVote* pMsg) { +static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) { + SEpSet epSet; + syncUtilraftId2EpSet(destRaftId, &epSet); + pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg); + return 0; +} + +static int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) { + SEpSet epSet; + syncUtilnodeInfo2EpSet(nodeInfo, &epSet); + pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg); + return 0; +} + +static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) { int32_t ret = 0; return ret; } -static int32_t onSyncNodeRequestVote(struct SSyncNode* ths, SyncRequestVote* pMsg) { +static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) { int32_t ret = 0; return ret; } -static int32_t onSyncNodeRequestVoteReply(struct SSyncNode* ths, SyncRequestVoteReply* pMsg) { +static int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { int32_t ret = 0; return ret; } -static int32_t doSyncNodeAppendEntries(struct SSyncNode* ths, const SyncAppendEntries* pMsg) { +static int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) { int32_t ret = 0; return ret; } -static int32_t onSyncNodeAppendEntries(struct SSyncNode* ths, SyncAppendEntries* pMsg) { +static int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { int32_t ret = 0; return ret; } -static int32_t onSyncNodeAppendEntriesReply(struct SSyncNode* ths, SyncAppendEntriesReply* pMsg) { +static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { int32_t ret = 0; return ret; } diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 89373037250aaf22d0c3f0ef06d8f9c27409d59c..baff4ed50b82150cc3e2a99660d2401d23ced6f6 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -15,5 +15,133 @@ #include "syncMessage.h" #include "syncRaft.h" +#include "tcoding.h" -void onMessage(SRaft *pRaft, void *pMsg) {} \ No newline at end of file +void onMessage(SRaft* pRaft, void* pMsg) {} + +// ---- message build ---- +SyncPing* syncPingBuild(uint32_t dataLen) { + uint32_t bytes = SYNC_PING_FIX_LEN + dataLen; + SyncPing* pSyncPing = malloc(bytes); + memset(pSyncPing, 0, bytes); + pSyncPing->bytes = bytes; + pSyncPing->msgType = SYNC_PING; + pSyncPing->dataLen = dataLen; +} + +void syncPingDestroy(SyncPing* pSyncPing) { + if (pSyncPing != NULL) { + free(pSyncPing); + } +} + +void syncPingSerialize(const SyncPing* pSyncPing, char* buf, uint32_t bufLen) { + assert(pSyncPing->bytes <= bufLen); + memcpy(buf, pSyncPing, pSyncPing->bytes); +} + +void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pSyncPing) { + uint32_t* pU32 = (uint32_t*)buf; + uint32_t bytes = *pU32; + pSyncPing = (SyncPing*)malloc(bytes); + memcpy(pSyncPing, buf, len); + assert(len == pSyncPing->bytes); + assert(pSyncPing->bytes == SYNC_PING_FIX_LEN + pSyncPing->dataLen); +} + +void syncPing2RpcMsg(const SyncPing* pSyncPing, SRpcMsg* pRpcMsg) { + pRpcMsg->msgType = pSyncPing->msgType; + uint32_t bufLen = pSyncPing->bytes; + char* buf = malloc(bufLen); + syncPingSerialize(pSyncPing, buf, bufLen); + pRpcMsg->contLen = bufLen; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + memcpy(pRpcMsg->pCont, buf, pRpcMsg->contLen); + free(buf); +} + +/* +typedef struct SRaftId { + SyncNodeId addr; // typedef uint64_t SyncNodeId; + SyncGroupId vgId; // typedef int32_t SyncGroupId; +} SRaftId; + +typedef struct SyncPing { + uint32_t bytes; + uint32_t msgType; + SRaftId srcId; + SRaftId destId; + uint32_t dataLen; + char* data; +} SyncPing; +*/ + +/* +void syncPingSerialize(const SyncPing* pSyncPing, char** ppBuf, uint32_t* bufLen) { + *bufLen = sizeof(SyncPing) + pSyncPing->dataLen; + *ppBuf = (char*)malloc(*bufLen); + void* pStart = *ppBuf; + uint32_t allBytes = *bufLen; + + int len = 0; + len = taosEncodeFixedU32(&pStart, pSyncPing->msgType); + allBytes -= len; + assert(len > 0); + pStart += len; + + len = taosEncodeFixedU64(&pStart, pSyncPing->srcId.addr); + allBytes -= len; + assert(len > 0); + pStart += len; + + len = taosEncodeFixedI32(&pStart, pSyncPing->srcId.vgId); + allBytes -= len; + assert(len > 0); + pStart += len; + + len = taosEncodeFixedU64(&pStart, pSyncPing->destId.addr); + allBytes -= len; + assert(len > 0); + pStart += len; + + len = taosEncodeFixedI32(&pStart, pSyncPing->destId.vgId); + allBytes -= len; + assert(len > 0); + pStart += len; + + len = taosEncodeFixedU32(&pStart, pSyncPing->dataLen); + allBytes -= len; + assert(len > 0); + pStart += len; + + memcpy(pStart, pSyncPing->data, pSyncPing->dataLen); + allBytes -= pSyncPing->dataLen; + assert(allBytes == 0); +} + + +void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pSyncPing) { + void* pStart = (void*)buf; + uint64_t u64; + int32_t i32; + uint32_t u32; + + pStart = taosDecodeFixedU64(pStart, &u64); + pSyncPing->msgType = u64; + + pStart = taosDecodeFixedU64(pStart, &u64); + pSyncPing->srcId.addr = u64; + + pStart = taosDecodeFixedI32(pStart, &i32); + pSyncPing->srcId.vgId = i32; + + pStart = taosDecodeFixedU64(pStart, &u64); + pSyncPing->destId.addr = u64; + + pStart = taosDecodeFixedI32(pStart, &i32); + pSyncPing->destId.vgId = i32; + + pStart = taosDecodeFixedU32(pStart, &u32); + pSyncPing->dataLen = u32; +} +*/ \ No newline at end of file diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index d68b1def4d0c94597f5aeb8c6ec8f47d4ea94967..080840bbf6403d8cfb0400d0a5e54ab82ba87953 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -14,9 +14,64 @@ */ #include "syncUtil.h" +#include +#include +#include -void nodeInfo2EpSet(const SNodeInfo* pNodeInfo, SEpSet* pEpSet) {} +// ---- encode / decode +uint64_t syncUtilAddr2U64(const char* host, uint16_t port) { + uint64_t u64; + uint32_t hostU32 = (uint32_t)inet_addr(host); + assert(hostU32 != (uint32_t)-1); + u64 = (((uint64_t)hostU32) << 32) | (((uint32_t)port) << 16); + return u64; +} -void raftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet) {} +void syncUtilU642Addr(uint64_t u64, char* host, size_t len, uint16_t* port) { + uint32_t hostU32 = (uint32_t)((u64 >> 32) & 0x00000000FFFFFFFF); -void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg) {} \ No newline at end of file + struct in_addr addr; + addr.s_addr = hostU32; + snprintf(host, len, "%s", inet_ntoa(addr)); + *port = (uint16_t)((u64 & 0x00000000FFFF0000) >> 16); +} + +void syncUtilnodeInfo2EpSet(const SNodeInfo* pNodeInfo, SEpSet* pEpSet) { + pEpSet->inUse = 0; + addEpIntoEpSet(pEpSet, pNodeInfo->nodeFqdn, pNodeInfo->nodePort); +} + +void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet) { + char host[TSDB_FQDN_LEN]; + uint16_t port; + + syncUtilU642Addr(raftId->addr, host, sizeof(host), &port); + pEpSet->inUse = 0; + addEpIntoEpSet(pEpSet, host, port); +} + +void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId) { + raftId->addr = syncUtilAddr2U64(pNodeInfo->nodeFqdn, pNodeInfo->nodePort); + raftId->vgId = vgId; +} + +// ---- SSyncBuffer ----- +#if 0 +void syncUtilbufBuild(SSyncBuffer* syncBuf, size_t len) { + syncBuf->len = len; + syncBuf->data = malloc(syncBuf->len); +} + +void syncUtilbufDestroy(SSyncBuffer* syncBuf) { free(syncBuf->data); } + +void syncUtilbufCopy(const SSyncBuffer* src, SSyncBuffer* dest) { + dest->len = src->len; + dest->data = src->data; +} + +void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest) { + dest->len = src->len; + dest->data = malloc(dest->len); + memcpy(dest->data, src->data, dest->len); +} +#endif \ No newline at end of file