diff --git a/source/libs/sync/inc/syncElection.h b/source/libs/sync/inc/syncElection.h index 019c291efc03a59e56fae223717718cf0a416de1..85a82dcfb70ffd7431e41c14d81263344ec68ddb 100644 --- a/source/libs/sync/inc/syncElection.h +++ b/source/libs/sync/inc/syncElection.h @@ -39,7 +39,6 @@ extern "C" { // /\ UNCHANGED <> // int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode); - int32_t syncNodeElect(SSyncNode* pSyncNode); int32_t syncNodeRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg); diff --git a/source/libs/sync/inc/syncIO.h b/source/libs/sync/inc/syncIO.h index 352d30c8d72b131b934f430971942fa718d54fd4..09e93fda1cd4c4eb4ae764e2e451f3331d9df7cb 100644 --- a/source/libs/sync/inc/syncIO.h +++ b/source/libs/sync/inc/syncIO.h @@ -34,11 +34,11 @@ extern "C" { typedef struct SSyncIO { STaosQueue *pMsgQ; - STaosQset * pQset; + STaosQset *pQset; pthread_t consumerTid; - void * serverRpc; - void * clientRpc; + void *serverRpc; + void *clientRpc; SEpSet myAddr; tmr_h qTimer; @@ -50,6 +50,7 @@ typedef struct SSyncIO { void *pSyncNode; int32_t (*FpOnSyncPing)(SSyncNode *pSyncNode, SyncPing *pMsg); int32_t (*FpOnSyncPingReply)(SSyncNode *pSyncNode, SyncPingReply *pMsg); + int32_t (*FpOnSyncClientRequest)(SSyncNode *pSyncNode, SyncClientRequest *pMsg); int32_t (*FpOnSyncRequestVote)(SSyncNode *pSyncNode, SyncRequestVote *pMsg); int32_t (*FpOnSyncRequestVoteReply)(SSyncNode *pSyncNode, SyncRequestVoteReply *pMsg); int32_t (*FpOnSyncAppendEntries)(SSyncNode *pSyncNode, SyncAppendEntries *pMsg); diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 15c719b76e08d27dda223275811f96433550ba84..5a9af83827b6d952ea36165d9a97524708c1c985 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -70,6 +70,9 @@ extern "C" { struct SyncTimeout; typedef struct SyncTimeout SyncTimeout; +struct SyncClientRequest; +typedef struct SyncClientRequest SyncClientRequest; + struct SyncPing; typedef struct SyncPing SyncPing; @@ -185,6 +188,7 @@ typedef struct SSyncNode { // callback int32_t (*FpOnPing)(SSyncNode* ths, SyncPing* pMsg); int32_t (*FpOnPingReply)(SSyncNode* ths, SyncPingReply* pMsg); + int32_t (*FpOnClientRequest)(SSyncNode* ths, SyncClientRequest* pMsg); int32_t (*FpOnRequestVote)(SSyncNode* ths, SyncRequestVote* pMsg); int32_t (*FpOnRequestVoteReply)(SSyncNode* ths, SyncRequestVoteReply* pMsg); int32_t (*FpOnAppendEntries)(SSyncNode* ths, SyncAppendEntries* pMsg); diff --git a/source/libs/sync/inc/syncReplication.h b/source/libs/sync/inc/syncReplication.h index aca6205b9dfd75d044ceb8156fb3683af45f097b..6fe18dae384551846cc0eff8ba5b33b413a16e19 100644 --- a/source/libs/sync/inc/syncReplication.h +++ b/source/libs/sync/inc/syncReplication.h @@ -52,7 +52,6 @@ extern "C" { // /\ UNCHANGED <> // int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode); - int32_t syncNodeReplicate(SSyncNode* pSyncNode); int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg); diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index c307ec5068f5cd3a74fdba70a4a1f80b5a55980f..8176ac417a164ebcf06eb54b34a7219c07e64cb9 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -29,7 +29,7 @@ static int32_t syncIODestroy(SSyncIO *io); static int32_t syncIOStartInternal(SSyncIO *io); static int32_t syncIOStopInternal(SSyncIO *io); -static void * syncIOConsumerFunc(void *param); +static void *syncIOConsumerFunc(void *param); static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); static int32_t syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey); @@ -234,9 +234,9 @@ static int32_t syncIOStopInternal(SSyncIO *io) { } static void *syncIOConsumerFunc(void *param) { - SSyncIO * io = param; + SSyncIO *io = param; STaosQall *qall; - SRpcMsg * pRpcMsg, rpcMsg; + SRpcMsg *pRpcMsg, rpcMsg; qall = taosAllocateQall(); while (1) { @@ -273,6 +273,13 @@ static void *syncIOConsumerFunc(void *param) { syncPingReplyDestroy(pSyncMsg); } + } else if (pRpcMsg->msgType == SYNC_CLIENT_REQUEST) { + if (io->FpOnSyncClientRequest != NULL) { + SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg); + io->FpOnSyncClientRequest(io->pSyncNode, pSyncMsg); + syncClientRequestDestroy(pSyncMsg); + } + } else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE) { if (io->FpOnSyncRequestVote != NULL) { SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index aaf6535f402fd1757433ca9253a5b04b28f9da0b..dd2c1421048cc8498da9d801611548c1123489a2 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -42,6 +42,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId); // on message ---- static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); +static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg); // --------------------------------- int32_t syncInit() { @@ -192,6 +193,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { // init callback pSyncNode->FpOnPing = syncNodeOnPingCb; pSyncNode->FpOnPingReply = syncNodeOnPingReplyCb; + pSyncNode->FpOnClientRequest = syncNodeOnClientRequestCb; pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteCb; pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplyCb; pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntriesCb; @@ -696,3 +698,19 @@ static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) { syncPingReplyLog2("==syncNodeOnPingReplyCb==", pMsg); return ret; } + +static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) { + int32_t ret = 0; + syncClientRequestLog2("==syncNodeOnClientRequestCb==", pMsg); + + if (ths->state == TAOS_SYNC_STATE_LEADER) { + SSyncRaftEntry* pEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); + ths->pLogStore->appendEntry(ths->pLogStore, pEntry); + syncNodeReplicate(ths); + syncEntryDestory(pEntry); + } else { + // ths->pFsm->FpCommitCb(-1) + } + + return ret; +} \ No newline at end of file