提交 d5732003 编写于 作者: M Minghao Li

sync ping

上级 eaf6142c
...@@ -71,15 +71,15 @@ typedef struct SSyncFSM { ...@@ -71,15 +71,15 @@ typedef struct SSyncFSM {
// when value in pBuf finish a raft flow, FpCommitCb is called, code indicates the result // when value in pBuf finish a raft flow, FpCommitCb is called, code indicates the result
// user can do something according to the code and isWeak. for example, write data into tsdb // user can do something according to the code and isWeak. for example, write data into tsdb
void (*FpCommitCb)(struct SSyncFSM* pFsm, const SSyncBuffer* pBuf, SyncIndex index, bool isWeak, int32_t code); void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pBuf, SyncIndex index, bool isWeak, int32_t code);
// when value in pBuf has been written into local log store, FpPreCommitCb is called, code indicates the result // when value in pBuf has been written into local log store, FpPreCommitCb is called, code indicates the result
// user can do something according to the code and isWeak. for example, write data into tsdb // user can do something according to the code and isWeak. for example, write data into tsdb
void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SSyncBuffer* pBuf, SyncIndex index, bool isWeak, int32_t code); void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pBuf, SyncIndex index, bool isWeak, int32_t code);
// when log entry is updated by a new one, FpRollBackCb is called // when log entry is updated by a new one, FpRollBackCb is called
// user can do something to roll back. for example, delete data from tsdb, or just ignore it // user can do something to roll back. for example, delete data from tsdb, or just ignore it
void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SSyncBuffer* pBuf, SyncIndex index, bool isWeak, int32_t code); void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pBuf, SyncIndex index, bool isWeak, int32_t code);
// user should implement this function, use "data" to take snapshot into "snapshot" // user should implement this function, use "data" to take snapshot into "snapshot"
int32_t (*FpTakeSnapshot)(SSnapshot* snapshot); int32_t (*FpTakeSnapshot)(SSnapshot* snapshot);
...@@ -95,10 +95,10 @@ typedef struct SSyncLogStore { ...@@ -95,10 +95,10 @@ typedef struct SSyncLogStore {
void* data; void* data;
// append one log entry // append one log entry
int32_t (*appendEntry)(struct SSyncLogStore* pLogStore, SSyncBuffer* pBuf); int32_t (*appendEntry)(struct SSyncLogStore* pLogStore, SRpcMsg* pBuf);
// get one log entry, user need to free pBuf->data // get one log entry, user need to free pBuf->data
int32_t (*getEntry)(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncBuffer* pBuf); int32_t (*getEntry)(struct SSyncLogStore* pLogStore, SyncIndex index, SRpcMsg* pBuf);
// update log store commit index with "index" // update log store commit index with "index"
int32_t (*updateCommitIndex)(struct SSyncLogStore* pLogStore, SyncIndex index); int32_t (*updateCommitIndex)(struct SSyncLogStore* pLogStore, SyncIndex index);
...@@ -153,8 +153,8 @@ int64_t syncStart(const SSyncInfo* pSyncInfo); ...@@ -153,8 +153,8 @@ int64_t syncStart(const SSyncInfo* pSyncInfo);
void syncStop(int64_t rid); void syncStop(int64_t rid);
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg); int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg);
// int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak); int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak);
int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak); // int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak);
ESyncState syncGetMyRole(int64_t rid); ESyncState syncGetMyRole(int64_t rid);
void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole); void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole);
......
...@@ -92,8 +92,8 @@ struct SSyncEnv; ...@@ -92,8 +92,8 @@ struct SSyncEnv;
typedef struct SSyncEnv SSyncEnv; typedef struct SSyncEnv SSyncEnv;
typedef struct SRaftId { typedef struct SRaftId {
SyncNodeId addr; SyncNodeId addr; // typedef uint64_t SyncNodeId;
SyncGroupId vgId; SyncGroupId vgId; // typedef int32_t SyncGroupId;
} SRaftId; } SRaftId;
typedef struct SSyncNode { typedef struct SSyncNode {
...@@ -133,17 +133,17 @@ typedef struct SSyncNode { ...@@ -133,17 +133,17 @@ typedef struct SSyncNode {
uint64_t heartbeatTimerCounter; uint64_t heartbeatTimerCounter;
// callback // callback
int32_t (*FpOnPing)(struct SSyncNode* ths, SyncPing* pMsg); int32_t (*FpOnPing)(SSyncNode* ths, SyncPing* pMsg);
int32_t (*FpOnPingReply)(struct SSyncNode* ths, SyncPingReply* pMsg); int32_t (*FpOnPingReply)(SSyncNode* ths, SyncPingReply* pMsg);
int32_t (*FpOnRequestVote)(struct SSyncNode* ths, SyncRequestVote* pMsg); int32_t (*FpOnRequestVote)(SSyncNode* ths, SyncRequestVote* pMsg);
int32_t (*FpOnRequestVoteReply)(struct SSyncNode* ths, SyncRequestVoteReply* pMsg); int32_t (*FpOnRequestVoteReply)(SSyncNode* ths, SyncRequestVoteReply* pMsg);
int32_t (*FpOnAppendEntries)(struct SSyncNode* ths, SyncAppendEntries* pMsg); int32_t (*FpOnAppendEntries)(SSyncNode* ths, SyncAppendEntries* pMsg);
int32_t (*FpOnAppendEntriesReply)(struct SSyncNode* ths, SyncAppendEntriesReply* pMsg); int32_t (*FpOnAppendEntriesReply)(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
// passed from outside // passed from outside
void* rpcClient; void* rpcClient;
......
...@@ -27,6 +27,7 @@ extern "C" { ...@@ -27,6 +27,7 @@ extern "C" {
#include "syncRaftEntry.h" #include "syncRaftEntry.h"
#include "taosdef.h" #include "taosdef.h"
// encode as uint64
typedef enum ESyncMessageType { typedef enum ESyncMessageType {
SYNC_PING = 0, SYNC_PING = 0,
SYNC_PING_REPLY, SYNC_PING_REPLY,
...@@ -38,29 +39,47 @@ typedef enum ESyncMessageType { ...@@ -38,29 +39,47 @@ typedef enum ESyncMessageType {
SYNC_APPEND_ENTRIES_REPLY, SYNC_APPEND_ENTRIES_REPLY,
} ESyncMessageType; } ESyncMessageType;
/*
typedef struct SRaftId {
SyncNodeId addr; // typedef uint64_t SyncNodeId;
SyncGroupId vgId; // typedef int32_t SyncGroupId;
} SRaftId;
*/
typedef struct SyncPing { typedef struct SyncPing {
ESyncMessageType msgType; uint32_t bytes;
const SSyncBuffer *pData; uint32_t msgType;
} SyncPing, RaftPing; SRaftId srcId;
SRaftId destId;
uint32_t dataLen;
char* data;
} SyncPing;
#define SYNC_PING_FIX_LEN (sizeof(uint32_t) + sizeof(uint32_t) + sizeof(SRaftId) + sizeof(SRaftId) + sizeof(uint32_t))
typedef struct SyncPingReply { typedef struct SyncPingReply {
ESyncMessageType msgType; uint32_t bytes;
const SSyncBuffer *pData; uint32_t msgType;
} SyncPingReply, RaftPingReply; SRaftId srcId;
SRaftId destId;
uint32_t dataLen;
char* data;
} SyncPingReply;
typedef struct SyncClientRequest { typedef struct SyncClientRequest {
ESyncMessageType msgType; ESyncMessageType msgType;
const SSyncBuffer *pData; char* data;
int64_t seqNum; uint32_t dataLen;
bool isWeak; int64_t seqNum;
} SyncClientRequest, RaftClientRequest; bool isWeak;
} SyncClientRequest;
typedef struct SyncClientRequestReply { typedef struct SyncClientRequestReply {
ESyncMessageType msgType; ESyncMessageType msgType;
int32_t errCode; int32_t errCode;
const SSyncBuffer *pErrMsg; SSyncBuffer* pErrMsg;
const SSyncBuffer *pLeaderHint; SSyncBuffer* pLeaderHint;
} SyncClientRequestReply, RaftClientRequestReply; } SyncClientRequestReply;
typedef struct SyncRequestVote { typedef struct SyncRequestVote {
ESyncMessageType msgType; ESyncMessageType msgType;
...@@ -69,7 +88,7 @@ typedef struct SyncRequestVote { ...@@ -69,7 +88,7 @@ typedef struct SyncRequestVote {
SyncGroupId vgId; SyncGroupId vgId;
SyncIndex lastLogIndex; SyncIndex lastLogIndex;
SyncTerm lastLogTerm; SyncTerm lastLogTerm;
} SyncRequestVote, RaftRequestVote; } SyncRequestVote;
typedef struct SyncRequestVoteReply { typedef struct SyncRequestVoteReply {
ESyncMessageType msgType; ESyncMessageType msgType;
...@@ -77,7 +96,7 @@ typedef struct SyncRequestVoteReply { ...@@ -77,7 +96,7 @@ typedef struct SyncRequestVoteReply {
SyncNodeId nodeId; SyncNodeId nodeId;
SyncGroupId vgId; SyncGroupId vgId;
bool voteGranted; bool voteGranted;
} SyncRequestVoteReply, RaftRequestVoteReply; } SyncRequestVoteReply;
typedef struct SyncAppendEntries { typedef struct SyncAppendEntries {
ESyncMessageType msgType; ESyncMessageType msgType;
...@@ -86,9 +105,9 @@ typedef struct SyncAppendEntries { ...@@ -86,9 +105,9 @@ typedef struct SyncAppendEntries {
SyncIndex prevLogIndex; SyncIndex prevLogIndex;
SyncTerm prevLogTerm; SyncTerm prevLogTerm;
int32_t entryCount; int32_t entryCount;
SSyncRaftEntry * logEntries; SSyncRaftEntry* logEntries;
SyncIndex commitIndex; SyncIndex commitIndex;
} SyncAppendEntries, RaftAppendEntries; } SyncAppendEntries;
typedef struct SyncAppendEntriesReply { typedef struct SyncAppendEntriesReply {
ESyncMessageType msgType; ESyncMessageType msgType;
...@@ -96,7 +115,18 @@ typedef struct SyncAppendEntriesReply { ...@@ -96,7 +115,18 @@ typedef struct SyncAppendEntriesReply {
SyncNodeId nodeId; SyncNodeId nodeId;
bool success; bool success;
SyncIndex matchIndex; SyncIndex matchIndex;
} SyncAppendEntriesReply, RaftAppendEntriesReply; } SyncAppendEntriesReply;
// ---- message build ----
SyncPing* syncPingBuild(uint32_t dataLen);
void syncPingDestroy(SyncPing* pSyncPing);
void syncPingSerialize(const SyncPing* pSyncPing, char* buf, uint32_t bufLen);
void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pSyncPing);
void syncPing2RpcMsg(const SyncPing* pSyncPing, SRpcMsg* pRpcMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -27,11 +27,28 @@ extern "C" { ...@@ -27,11 +27,28 @@ extern "C" {
#include "syncMessage.h" #include "syncMessage.h"
#include "taosdef.h" #include "taosdef.h"
void nodeInfo2EpSet(const SNodeInfo* pNodeInfo, SEpSet* pEpSet); // ---- encode / decode
void raftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet); uint64_t syncUtilAddr2U64(const char* host, uint16_t port);
void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg); void syncUtilU642Addr(uint64_t u64, char* host, size_t len, uint16_t* port);
void syncUtilnodeInfo2EpSet(const SNodeInfo* pNodeInfo, SEpSet* pEpSet);
void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet);
void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId);
// ---- SSyncBuffer ----
#if 0
void syncUtilbufBuild(SSyncBuffer* syncBuf, size_t len);
void syncUtilbufDestroy(SSyncBuffer* syncBuf);
void syncUtilbufCopy(const SSyncBuffer* src, SSyncBuffer* dest);
void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest);
#endif
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -23,19 +23,20 @@ ...@@ -23,19 +23,20 @@
static int32_t tsNodeRefId = -1; static int32_t tsNodeRefId = -1;
// ------ local funciton --------- // ------ local funciton ---------
static int32_t doSyncNodeSendMsgById(SRaftId* destRaftId, struct SSyncNode* pSyncNode, SRpcMsg* pMsg); static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg);
static int32_t doSyncNodeSendMsgByInfo(SNodeInfo* nodeInfo, struct SSyncNode* pSyncNode, SRpcMsg* pMsg); static int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg);
static int32_t doSyncNodePing(struct SSyncNode* ths, const SyncPing* pMsg);
static int32_t onSyncNodePing(struct SSyncNode* ths, SyncPing* pMsg);
static int32_t onSyncNodePingReply(struct SSyncNode* ths, SyncPingReply* pMsg);
static int32_t doSyncNodeRequestVote(struct SSyncNode* ths, const SyncRequestVote* pMsg);
static int32_t onSyncNodeRequestVote(struct SSyncNode* ths, SyncRequestVote* pMsg);
static int32_t onSyncNodeRequestVoteReply(struct SSyncNode* ths, SyncRequestVoteReply* pMsg);
static int32_t doSyncNodeAppendEntries(struct SSyncNode* ths, const SyncAppendEntries* pMsg);
static int32_t onSyncNodeAppendEntries(struct SSyncNode* ths, SyncAppendEntries* pMsg);
static int32_t onSyncNodeAppendEntriesReply(struct SSyncNode* ths, SyncAppendEntriesReply* pMsg);
static void syncNodePingTimerCb(void* param, void* tmrId); static void syncNodePingTimerCb(void* param, void* tmrId);
static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg);
static int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg);
static int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg);
static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
static int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg);
static int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg);
static int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg);
static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
// --------------------------------- // ---------------------------------
int32_t syncInit() { int32_t syncInit() {
...@@ -55,7 +56,9 @@ void syncStop(int64_t rid) {} ...@@ -55,7 +56,9 @@ void syncStop(int64_t rid) {}
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { return 0; } int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { return 0; }
int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak) { return 0; } // int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak) { return 0; }
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak) { return 0; }
ESyncState syncGetMyRole(int64_t rid) { return TAOS_SYNC_STATE_LEADER; } ESyncState syncGetMyRole(int64_t rid) { return TAOS_SYNC_STATE_LEADER; }
...@@ -75,12 +78,12 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { ...@@ -75,12 +78,12 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
pSyncNode->rpcClient = pSyncInfo->rpcClient; pSyncNode->rpcClient = pSyncInfo->rpcClient;
pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg; pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg;
pSyncNode->FpOnPing = onSyncNodePing; pSyncNode->FpOnPing = syncNodeOnPingCb;
pSyncNode->FpOnPingReply = onSyncNodePingReply; pSyncNode->FpOnPingReply = syncNodeOnPingReplyCb;
pSyncNode->FpOnRequestVote = onSyncNodeRequestVote; pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteCb;
pSyncNode->FpOnRequestVoteReply = onSyncNodeRequestVoteReply; pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplyCb;
pSyncNode->FpOnAppendEntries = onSyncNodeAppendEntries; pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntriesCb;
pSyncNode->FpOnAppendEntriesReply = onSyncNodeAppendEntriesReply; pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReplyCb;
return pSyncNode; return pSyncNode;
} }
...@@ -92,13 +95,35 @@ void syncNodeClose(SSyncNode* pSyncNode) { ...@@ -92,13 +95,35 @@ void syncNodeClose(SSyncNode* pSyncNode) {
void syncNodePingAll(SSyncNode* pSyncNode) { void syncNodePingAll(SSyncNode* pSyncNode) {
sTrace("syncNodePingAll %p ", pSyncNode); sTrace("syncNodePingAll %p ", pSyncNode);
SyncPing msg; int32_t ret = 0;
doSyncNodePing(pSyncNode, &msg); for (int i = 0; i < pSyncNode->syncCfg.replicaNum; ++i) {
SyncPing* pSyncPing;
SRaftId raftId;
syncUtilnodeInfo2raftId(&pSyncNode->syncCfg.nodeInfo[i], pSyncNode->vgId, &raftId);
ret = syncNodePing(pSyncNode, &raftId, pSyncPing);
assert(ret == 0);
}
} }
void syncNodePingPeers(SSyncNode* pSyncNode) {} void syncNodePingPeers(SSyncNode* pSyncNode) {
int32_t ret = 0;
for (int i = 0; i < pSyncNode->peersNum; ++i) {
SyncPing* pSyncPing;
SRaftId raftId;
syncUtilnodeInfo2raftId(&pSyncNode->peers[i], pSyncNode->vgId, &raftId);
ret = syncNodePing(pSyncNode, &raftId, pSyncPing);
assert(ret == 0);
}
}
void syncNodePingSelf(SSyncNode* pSyncNode) {} void syncNodePingSelf(SSyncNode* pSyncNode) {
int32_t ret = 0;
SyncPing* pSyncPing;
SRaftId raftId;
syncUtilnodeInfo2raftId(&pSyncNode->me, pSyncNode->vgId, &raftId);
ret = syncNodePing(pSyncNode, &raftId, pSyncPing);
assert(ret == 0);
}
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
if (pSyncNode->pPingTimer == NULL) { if (pSyncNode->pPingTimer == NULL) {
...@@ -120,69 +145,64 @@ int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) { ...@@ -120,69 +145,64 @@ int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
} }
// ------ local funciton --------- // ------ local funciton ---------
static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) {
static int32_t doSyncNodeSendMsgById(SRaftId* destRaftId, struct SSyncNode* pSyncNode, SRpcMsg* pMsg) { int32_t ret = 0;
SEpSet epSet; SRpcMsg* rpcMsg;
raftId2EpSet(destRaftId, &epSet); syncPing2RpcMsg(pMsg, rpcMsg);
pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg); syncNodeSendMsgById(destRaftId, pSyncNode, rpcMsg);
return 0;
}
static int32_t doSyncNodeSendMsgByInfo(SNodeInfo* nodeInfo, struct SSyncNode* pSyncNode, SRpcMsg* pMsg) {
SEpSet epSet;
nodeInfo2EpSet(nodeInfo, &epSet);
pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg);
return 0;
}
static int32_t doSyncNodePing(struct SSyncNode* ths, const SyncPing* pMsg) {
int32_t ret;
for (int i = 0; i < ths->syncCfg.replicaNum; ++i) {
SRpcMsg* rpcMsg;
syncPing2RpcMsg(pMsg, rpcMsg);
doSyncNodeSendMsgByInfo(&ths->syncCfg.nodeInfo[i], ths, rpcMsg);
}
return ret; return ret;
} }
static int32_t onSyncNodePing(struct SSyncNode* ths, SyncPing* pMsg) { static int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg) {
int32_t ret = 0; int32_t ret = 0;
return ret; return ret;
} }
static int32_t onSyncNodePingReply(struct SSyncNode* ths, SyncPingReply* pMsg) { static int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg) {
int32_t ret = 0; int32_t ret = 0;
return ret; return ret;
} }
static int32_t doSyncNodeRequestVote(struct SSyncNode* ths, const SyncRequestVote* pMsg) { static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
SEpSet epSet;
syncUtilraftId2EpSet(destRaftId, &epSet);
pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg);
return 0;
}
static int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
SEpSet epSet;
syncUtilnodeInfo2EpSet(nodeInfo, &epSet);
pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg);
return 0;
}
static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) {
int32_t ret = 0; int32_t ret = 0;
return ret; return ret;
} }
static int32_t onSyncNodeRequestVote(struct SSyncNode* ths, SyncRequestVote* pMsg) { static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) {
int32_t ret = 0; int32_t ret = 0;
return ret; return ret;
} }
static int32_t onSyncNodeRequestVoteReply(struct SSyncNode* ths, SyncRequestVoteReply* pMsg) { static int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {
int32_t ret = 0; int32_t ret = 0;
return ret; return ret;
} }
static int32_t doSyncNodeAppendEntries(struct SSyncNode* ths, const SyncAppendEntries* pMsg) { static int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) {
int32_t ret = 0; int32_t ret = 0;
return ret; return ret;
} }
static int32_t onSyncNodeAppendEntries(struct SSyncNode* ths, SyncAppendEntries* pMsg) { static int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
int32_t ret = 0; int32_t ret = 0;
return ret; return ret;
} }
static int32_t onSyncNodeAppendEntriesReply(struct SSyncNode* ths, SyncAppendEntriesReply* pMsg) { static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
int32_t ret = 0; int32_t ret = 0;
return ret; return ret;
} }
......
...@@ -15,5 +15,133 @@ ...@@ -15,5 +15,133 @@
#include "syncMessage.h" #include "syncMessage.h"
#include "syncRaft.h" #include "syncRaft.h"
#include "tcoding.h"
void onMessage(SRaft *pRaft, void *pMsg) {} void onMessage(SRaft* pRaft, void* pMsg) {}
\ No newline at end of file
// ---- message build ----
SyncPing* syncPingBuild(uint32_t dataLen) {
uint32_t bytes = SYNC_PING_FIX_LEN + dataLen;
SyncPing* pSyncPing = malloc(bytes);
memset(pSyncPing, 0, bytes);
pSyncPing->bytes = bytes;
pSyncPing->msgType = SYNC_PING;
pSyncPing->dataLen = dataLen;
}
void syncPingDestroy(SyncPing* pSyncPing) {
if (pSyncPing != NULL) {
free(pSyncPing);
}
}
void syncPingSerialize(const SyncPing* pSyncPing, char* buf, uint32_t bufLen) {
assert(pSyncPing->bytes <= bufLen);
memcpy(buf, pSyncPing, pSyncPing->bytes);
}
void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pSyncPing) {
uint32_t* pU32 = (uint32_t*)buf;
uint32_t bytes = *pU32;
pSyncPing = (SyncPing*)malloc(bytes);
memcpy(pSyncPing, buf, len);
assert(len == pSyncPing->bytes);
assert(pSyncPing->bytes == SYNC_PING_FIX_LEN + pSyncPing->dataLen);
}
void syncPing2RpcMsg(const SyncPing* pSyncPing, SRpcMsg* pRpcMsg) {
pRpcMsg->msgType = pSyncPing->msgType;
uint32_t bufLen = pSyncPing->bytes;
char* buf = malloc(bufLen);
syncPingSerialize(pSyncPing, buf, bufLen);
pRpcMsg->contLen = bufLen;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
memcpy(pRpcMsg->pCont, buf, pRpcMsg->contLen);
free(buf);
}
/*
typedef struct SRaftId {
SyncNodeId addr; // typedef uint64_t SyncNodeId;
SyncGroupId vgId; // typedef int32_t SyncGroupId;
} SRaftId;
typedef struct SyncPing {
uint32_t bytes;
uint32_t msgType;
SRaftId srcId;
SRaftId destId;
uint32_t dataLen;
char* data;
} SyncPing;
*/
/*
void syncPingSerialize(const SyncPing* pSyncPing, char** ppBuf, uint32_t* bufLen) {
*bufLen = sizeof(SyncPing) + pSyncPing->dataLen;
*ppBuf = (char*)malloc(*bufLen);
void* pStart = *ppBuf;
uint32_t allBytes = *bufLen;
int len = 0;
len = taosEncodeFixedU32(&pStart, pSyncPing->msgType);
allBytes -= len;
assert(len > 0);
pStart += len;
len = taosEncodeFixedU64(&pStart, pSyncPing->srcId.addr);
allBytes -= len;
assert(len > 0);
pStart += len;
len = taosEncodeFixedI32(&pStart, pSyncPing->srcId.vgId);
allBytes -= len;
assert(len > 0);
pStart += len;
len = taosEncodeFixedU64(&pStart, pSyncPing->destId.addr);
allBytes -= len;
assert(len > 0);
pStart += len;
len = taosEncodeFixedI32(&pStart, pSyncPing->destId.vgId);
allBytes -= len;
assert(len > 0);
pStart += len;
len = taosEncodeFixedU32(&pStart, pSyncPing->dataLen);
allBytes -= len;
assert(len > 0);
pStart += len;
memcpy(pStart, pSyncPing->data, pSyncPing->dataLen);
allBytes -= pSyncPing->dataLen;
assert(allBytes == 0);
}
void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pSyncPing) {
void* pStart = (void*)buf;
uint64_t u64;
int32_t i32;
uint32_t u32;
pStart = taosDecodeFixedU64(pStart, &u64);
pSyncPing->msgType = u64;
pStart = taosDecodeFixedU64(pStart, &u64);
pSyncPing->srcId.addr = u64;
pStart = taosDecodeFixedI32(pStart, &i32);
pSyncPing->srcId.vgId = i32;
pStart = taosDecodeFixedU64(pStart, &u64);
pSyncPing->destId.addr = u64;
pStart = taosDecodeFixedI32(pStart, &i32);
pSyncPing->destId.vgId = i32;
pStart = taosDecodeFixedU32(pStart, &u32);
pSyncPing->dataLen = u32;
}
*/
\ No newline at end of file
...@@ -14,9 +14,64 @@ ...@@ -14,9 +14,64 @@
*/ */
#include "syncUtil.h" #include "syncUtil.h"
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/socket.h>
void nodeInfo2EpSet(const SNodeInfo* pNodeInfo, SEpSet* pEpSet) {} // ---- encode / decode
uint64_t syncUtilAddr2U64(const char* host, uint16_t port) {
uint64_t u64;
uint32_t hostU32 = (uint32_t)inet_addr(host);
assert(hostU32 != (uint32_t)-1);
u64 = (((uint64_t)hostU32) << 32) | (((uint32_t)port) << 16);
return u64;
}
void raftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet) {} void syncUtilU642Addr(uint64_t u64, char* host, size_t len, uint16_t* port) {
uint32_t hostU32 = (uint32_t)((u64 >> 32) & 0x00000000FFFFFFFF);
void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg) {} struct in_addr addr;
\ No newline at end of file addr.s_addr = hostU32;
snprintf(host, len, "%s", inet_ntoa(addr));
*port = (uint16_t)((u64 & 0x00000000FFFF0000) >> 16);
}
void syncUtilnodeInfo2EpSet(const SNodeInfo* pNodeInfo, SEpSet* pEpSet) {
pEpSet->inUse = 0;
addEpIntoEpSet(pEpSet, pNodeInfo->nodeFqdn, pNodeInfo->nodePort);
}
void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet) {
char host[TSDB_FQDN_LEN];
uint16_t port;
syncUtilU642Addr(raftId->addr, host, sizeof(host), &port);
pEpSet->inUse = 0;
addEpIntoEpSet(pEpSet, host, port);
}
void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId) {
raftId->addr = syncUtilAddr2U64(pNodeInfo->nodeFqdn, pNodeInfo->nodePort);
raftId->vgId = vgId;
}
// ---- SSyncBuffer -----
#if 0
void syncUtilbufBuild(SSyncBuffer* syncBuf, size_t len) {
syncBuf->len = len;
syncBuf->data = malloc(syncBuf->len);
}
void syncUtilbufDestroy(SSyncBuffer* syncBuf) { free(syncBuf->data); }
void syncUtilbufCopy(const SSyncBuffer* src, SSyncBuffer* dest) {
dest->len = src->len;
dest->data = src->data;
}
void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest) {
dest->len = src->len;
dest->data = malloc(dest->len);
memcpy(dest->data, src->data, dest->len);
}
#endif
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册