未验证 提交 55380ff8 编写于 作者: L Li Minghao 提交者: GitHub

Merge pull request #10753 from taosdata/feature/3.0_mhli

Feature/3.0 mhli
...@@ -39,7 +39,6 @@ extern "C" { ...@@ -39,7 +39,6 @@ extern "C" {
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>> // /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
// //
int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode); int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode);
int32_t syncNodeElect(SSyncNode* pSyncNode); int32_t syncNodeElect(SSyncNode* pSyncNode);
int32_t syncNodeRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg); int32_t syncNodeRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg);
......
...@@ -34,11 +34,11 @@ extern "C" { ...@@ -34,11 +34,11 @@ extern "C" {
typedef struct SSyncIO { typedef struct SSyncIO {
STaosQueue *pMsgQ; STaosQueue *pMsgQ;
STaosQset * pQset; STaosQset *pQset;
pthread_t consumerTid; pthread_t consumerTid;
void * serverRpc; void *serverRpc;
void * clientRpc; void *clientRpc;
SEpSet myAddr; SEpSet myAddr;
tmr_h qTimer; tmr_h qTimer;
...@@ -50,6 +50,7 @@ typedef struct SSyncIO { ...@@ -50,6 +50,7 @@ typedef struct SSyncIO {
void *pSyncNode; void *pSyncNode;
int32_t (*FpOnSyncPing)(SSyncNode *pSyncNode, SyncPing *pMsg); int32_t (*FpOnSyncPing)(SSyncNode *pSyncNode, SyncPing *pMsg);
int32_t (*FpOnSyncPingReply)(SSyncNode *pSyncNode, SyncPingReply *pMsg); int32_t (*FpOnSyncPingReply)(SSyncNode *pSyncNode, SyncPingReply *pMsg);
int32_t (*FpOnSyncClientRequest)(SSyncNode *pSyncNode, SyncClientRequest *pMsg);
int32_t (*FpOnSyncRequestVote)(SSyncNode *pSyncNode, SyncRequestVote *pMsg); int32_t (*FpOnSyncRequestVote)(SSyncNode *pSyncNode, SyncRequestVote *pMsg);
int32_t (*FpOnSyncRequestVoteReply)(SSyncNode *pSyncNode, SyncRequestVoteReply *pMsg); int32_t (*FpOnSyncRequestVoteReply)(SSyncNode *pSyncNode, SyncRequestVoteReply *pMsg);
int32_t (*FpOnSyncAppendEntries)(SSyncNode *pSyncNode, SyncAppendEntries *pMsg); int32_t (*FpOnSyncAppendEntries)(SSyncNode *pSyncNode, SyncAppendEntries *pMsg);
......
...@@ -70,6 +70,9 @@ extern "C" { ...@@ -70,6 +70,9 @@ extern "C" {
struct SyncTimeout; struct SyncTimeout;
typedef struct SyncTimeout SyncTimeout; typedef struct SyncTimeout SyncTimeout;
struct SyncClientRequest;
typedef struct SyncClientRequest SyncClientRequest;
struct SyncPing; struct SyncPing;
typedef struct SyncPing SyncPing; typedef struct SyncPing SyncPing;
...@@ -185,6 +188,7 @@ typedef struct SSyncNode { ...@@ -185,6 +188,7 @@ typedef struct SSyncNode {
// callback // callback
int32_t (*FpOnPing)(SSyncNode* ths, SyncPing* pMsg); int32_t (*FpOnPing)(SSyncNode* ths, SyncPing* pMsg);
int32_t (*FpOnPingReply)(SSyncNode* ths, SyncPingReply* pMsg); int32_t (*FpOnPingReply)(SSyncNode* ths, SyncPingReply* pMsg);
int32_t (*FpOnClientRequest)(SSyncNode* ths, SyncClientRequest* pMsg);
int32_t (*FpOnRequestVote)(SSyncNode* ths, SyncRequestVote* pMsg); int32_t (*FpOnRequestVote)(SSyncNode* ths, SyncRequestVote* pMsg);
int32_t (*FpOnRequestVoteReply)(SSyncNode* ths, SyncRequestVoteReply* pMsg); int32_t (*FpOnRequestVoteReply)(SSyncNode* ths, SyncRequestVoteReply* pMsg);
int32_t (*FpOnAppendEntries)(SSyncNode* ths, SyncAppendEntries* pMsg); int32_t (*FpOnAppendEntries)(SSyncNode* ths, SyncAppendEntries* pMsg);
......
...@@ -52,7 +52,6 @@ extern "C" { ...@@ -52,7 +52,6 @@ extern "C" {
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>> // /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
// //
int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode); int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode);
int32_t syncNodeReplicate(SSyncNode* pSyncNode); int32_t syncNodeReplicate(SSyncNode* pSyncNode);
int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg); int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg);
......
...@@ -29,7 +29,7 @@ static int32_t syncIODestroy(SSyncIO *io); ...@@ -29,7 +29,7 @@ static int32_t syncIODestroy(SSyncIO *io);
static int32_t syncIOStartInternal(SSyncIO *io); static int32_t syncIOStartInternal(SSyncIO *io);
static int32_t syncIOStopInternal(SSyncIO *io); static int32_t syncIOStopInternal(SSyncIO *io);
static void * syncIOConsumerFunc(void *param); static void *syncIOConsumerFunc(void *param);
static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
static int32_t syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey); static int32_t syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey);
...@@ -234,9 +234,9 @@ static int32_t syncIOStopInternal(SSyncIO *io) { ...@@ -234,9 +234,9 @@ static int32_t syncIOStopInternal(SSyncIO *io) {
} }
static void *syncIOConsumerFunc(void *param) { static void *syncIOConsumerFunc(void *param) {
SSyncIO * io = param; SSyncIO *io = param;
STaosQall *qall; STaosQall *qall;
SRpcMsg * pRpcMsg, rpcMsg; SRpcMsg *pRpcMsg, rpcMsg;
qall = taosAllocateQall(); qall = taosAllocateQall();
while (1) { while (1) {
...@@ -273,6 +273,13 @@ static void *syncIOConsumerFunc(void *param) { ...@@ -273,6 +273,13 @@ static void *syncIOConsumerFunc(void *param) {
syncPingReplyDestroy(pSyncMsg); syncPingReplyDestroy(pSyncMsg);
} }
} else if (pRpcMsg->msgType == SYNC_CLIENT_REQUEST) {
if (io->FpOnSyncClientRequest != NULL) {
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
io->FpOnSyncClientRequest(io->pSyncNode, pSyncMsg);
syncClientRequestDestroy(pSyncMsg);
}
} else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE) { } else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE) {
if (io->FpOnSyncRequestVote != NULL) { if (io->FpOnSyncRequestVote != NULL) {
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg); SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
......
...@@ -42,6 +42,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId); ...@@ -42,6 +42,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId);
// on message ---- // on message ----
static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg);
// --------------------------------- // ---------------------------------
int32_t syncInit() { int32_t syncInit() {
...@@ -192,6 +193,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { ...@@ -192,6 +193,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
// init callback // init callback
pSyncNode->FpOnPing = syncNodeOnPingCb; pSyncNode->FpOnPing = syncNodeOnPingCb;
pSyncNode->FpOnPingReply = syncNodeOnPingReplyCb; pSyncNode->FpOnPingReply = syncNodeOnPingReplyCb;
pSyncNode->FpOnClientRequest = syncNodeOnClientRequestCb;
pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteCb; pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteCb;
pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplyCb; pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplyCb;
pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntriesCb; pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntriesCb;
...@@ -696,3 +698,19 @@ static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) { ...@@ -696,3 +698,19 @@ static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) {
syncPingReplyLog2("==syncNodeOnPingReplyCb==", pMsg); syncPingReplyLog2("==syncNodeOnPingReplyCb==", pMsg);
return ret; return ret;
} }
static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) {
int32_t ret = 0;
syncClientRequestLog2("==syncNodeOnClientRequestCb==", pMsg);
if (ths->state == TAOS_SYNC_STATE_LEADER) {
SSyncRaftEntry* pEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
ths->pLogStore->appendEntry(ths->pLogStore, pEntry);
syncNodeReplicate(ths);
syncEntryDestory(pEntry);
} else {
// ths->pFsm->FpCommitCb(-1)
}
return ret;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册