未验证 提交 fee43064 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #17812 from taosdata/fix/TD-20052

enh: refact sync callback func
......@@ -132,27 +132,27 @@ typedef struct SSnapshotMeta {
typedef struct SSyncFSM {
void* data;
void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
void (*FpCommitCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SFsmCbMeta *pMeta);
void (*FpPreCommitCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
void (*FpRollBackCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
void (*FpRestoreFinishCb)(struct SSyncFSM* pFsm);
void (*FpReConfigCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta* cbMeta);
void (*FpLeaderTransferCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
void (*FpRestoreFinishCb)(const struct SSyncFSM* pFsm);
void (*FpReConfigCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SReConfigCbMeta* pMeta);
void (*FpLeaderTransferCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
void (*FpBecomeLeaderCb)(struct SSyncFSM* pFsm);
void (*FpBecomeFollowerCb)(struct SSyncFSM* pFsm);
void (*FpBecomeLeaderCb)(const struct SSyncFSM* pFsm);
void (*FpBecomeFollowerCb)(const struct SSyncFSM* pFsm);
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot, void* pReaderParam, void** ppReader);
int32_t (*FpGetSnapshotInfo)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
int32_t (*FpGetSnapshot)(const struct SSyncFSM* pFsm, SSnapshot* pSnapshot, void* pReaderParam, void** ppReader);
int32_t (*FpGetSnapshotInfo)(const struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
int32_t (*FpSnapshotStartRead)(struct SSyncFSM* pFsm, void* pReaderParam, void** ppReader);
int32_t (*FpSnapshotStopRead)(struct SSyncFSM* pFsm, void* pReader);
int32_t (*FpSnapshotDoRead)(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len);
int32_t (*FpSnapshotStartRead)(const struct SSyncFSM* pFsm, void* pReaderParam, void** ppReader);
int32_t (*FpSnapshotStopRead)(const struct SSyncFSM* pFsm, void* pReader);
int32_t (*FpSnapshotDoRead)(const struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len);
int32_t (*FpSnapshotStartWrite)(struct SSyncFSM* pFsm, void* pWriterParam, void** ppWriter);
int32_t (*FpSnapshotStopWrite)(struct SSyncFSM* pFsm, void* pWriter, bool isApply, SSnapshot* pSnapshot);
int32_t (*FpSnapshotDoWrite)(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len);
int32_t (*FpSnapshotStartWrite)(const struct SSyncFSM* pFsm, void* pWriterParam, void** ppWriter);
int32_t (*FpSnapshotStopWrite)(const struct SSyncFSM* pFsm, void* pWriter, bool isApply, SSnapshot* pSnapshot);
int32_t (*FpSnapshotDoWrite)(const struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len);
} SSyncFSM;
......@@ -193,9 +193,13 @@ typedef struct SSyncInfo {
SWal* pWal;
SSyncFSM* pFsm;
SMsgCb* msgcb;
int32_t (*FpSendMsg)(const SEpSet* pEpSet, SRpcMsg* pMsg);
int32_t (*FpEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
int32_t (*FpEqCtrlMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
int32_t pingMs;
int32_t electMs;
int32_t heartbeatMs;
int32_t (*syncSendMSg)(const SEpSet* pEpSet, SRpcMsg* pMsg);
int32_t (*syncEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
int32_t (*syncEqCtrlMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
} SSyncInfo;
int32_t syncInit();
......@@ -228,6 +232,8 @@ int32_t syncStepDown(int64_t rid, SyncTerm newTerm);
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg);
const char* syncUtilState2String(ESyncState state);
#ifdef __cplusplus
}
#endif
......
......@@ -28,22 +28,11 @@ typedef struct SRaftId {
SyncGroupId vgId;
} SRaftId;
int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg);
int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcHandleInfo* pInfo);
void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb);
char* sync2SimpleStr(int64_t rid);
// set timer ms
void setPingTimerMS(int64_t rid, int32_t pingTimerMS);
void setElectTimerMS(int64_t rid, int32_t electTimerMS);
void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS);
// for compatibility, the same as syncPropose
int32_t syncForwardToPeer(int64_t rid, SRpcMsg* pMsg, bool isWeak);
// utils
const char* syncUtilState2String(ESyncState state);
// ------------------ for debug -------------------
void syncRpcMsgPrint(SRpcMsg* pMsg);
void syncRpcMsgPrint2(char* s, SRpcMsg* pMsg);
......
......@@ -20,7 +20,6 @@
#include "sdb.h"
#include "sync.h"
#include "syncTools.h"
#include "tcache.h"
#include "tdatablock.h"
#include "tglobal.h"
......@@ -90,7 +89,6 @@ typedef struct {
int32_t errCode;
int32_t transId;
SRWLatch lock;
int8_t leaderTransferFinish;
int8_t selfIndex;
int8_t numOfReplicas;
SReplica replicas[TSDB_MAX_REPLICA];
......
......@@ -428,18 +428,7 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
void mndPreClose(SMnode *pMnode) {
if (pMnode != NULL) {
atomic_store_8(&(pMnode->syncMgmt.leaderTransferFinish), 0);
syncLeaderTransfer(pMnode->syncMgmt.sync);
#if 0
mInfo("vgId:1, mnode start leader transfer");
// wait for leader transfer finish
while (!atomic_load_8(&(pMnode->syncMgmt.leaderTransferFinish))) {
taosMsleep(10);
mInfo("vgId:1, mnode waiting for leader transfer");
}
mInfo("vgId:1, mnode finish leader transfer");
#endif
}
}
......
......@@ -72,25 +72,25 @@ static int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
return code;
}
void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
void mndSyncCommitMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
SMnode *pMnode = pFsm->data;
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
SSdbRaw *pRaw = pMsg->pCont;
// delete msg handle
SRpcMsg rpcMsg = {0};
syncGetAndDelRespRpc(pMnode->syncMgmt.sync, cbMeta.seqNum, &rpcMsg.info);
rpcMsg.info = pMsg->info;
int32_t transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw);
pMgmt->errCode = cbMeta.code;
pMgmt->errCode = pMeta->code;
mInfo("trans:%d, is proposed, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64
" role:%s raw:%p",
transId, pMgmt->transId, cbMeta.code, cbMeta.index, cbMeta.term, cbMeta.lastConfigIndex, syncStr(cbMeta.state),
transId, pMgmt->transId, pMeta->code, pMeta->index, pMeta->term, pMeta->lastConfigIndex, syncStr(pMeta->state),
pRaw);
if (pMgmt->errCode == 0) {
sdbWriteWithoutFree(pMnode->pSdb, pRaw);
sdbSetApplyInfo(pMnode->pSdb, cbMeta.index, cbMeta.term, cbMeta.lastConfigIndex);
sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex);
}
taosWLockLatch(&pMgmt->lock);
......@@ -120,7 +120,7 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
}
}
int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pReaderParam, void **ppReader) {
int32_t mndSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pReaderParam, void **ppReader) {
mInfo("start to read snapshot from sdb in atomic way");
SMnode *pMnode = pFsm->data;
return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm,
......@@ -128,13 +128,13 @@ int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pR
return 0;
}
int32_t mndSyncGetSnapshotInfo(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) {
int32_t mndSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) {
SMnode *pMnode = pFsm->data;
sdbGetCommitInfo(pMnode->pSdb, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, &pSnapshot->lastConfigIndex);
return 0;
}
void mndRestoreFinish(struct SSyncFSM *pFsm) {
void mndRestoreFinish(const SSyncFSM *pFsm) {
SMnode *pMnode = pFsm->data;
if (!pMnode->deploy) {
......@@ -146,32 +146,30 @@ void mndRestoreFinish(struct SSyncFSM *pFsm) {
}
}
void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta *cbMeta) {}
int32_t mndSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) {
int32_t mndSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {
mInfo("start to read snapshot from sdb");
SMnode *pMnode = pFsm->data;
return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, NULL, NULL, NULL);
}
int32_t mndSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) {
int32_t mndSnapshotStopRead(const SSyncFSM *pFsm, void *pReader) {
mInfo("stop to read snapshot from sdb");
SMnode *pMnode = pFsm->data;
return sdbStopRead(pMnode->pSdb, pReader);
}
int32_t mndSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
int32_t mndSnapshotDoRead(const SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
SMnode *pMnode = pFsm->data;
return sdbDoRead(pMnode->pSdb, pReader, ppBuf, len);
}
int32_t mndSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void **ppWriter) {
int32_t mndSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void **ppWriter) {
mInfo("start to apply snapshot to sdb");
SMnode *pMnode = pFsm->data;
return sdbStartWrite(pMnode->pSdb, (SSdbIter **)ppWriter);
}
int32_t mndSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
int32_t mndSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
mInfo("stop to apply snapshot to sdb, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64, isApply,
pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
SMnode *pMnode = pFsm->data;
......@@ -179,18 +177,12 @@ int32_t mndSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply,
pSnapshot->lastConfigIndex);
}
int32_t mndSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
int32_t mndSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
SMnode *pMnode = pFsm->data;
return sdbDoWrite(pMnode->pSdb, pWriter, pBuf, len);
}
void mndLeaderTransfer(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
SMnode *pMnode = pFsm->data;
atomic_store_8(&(pMnode->syncMgmt.leaderTransferFinish), 1);
mInfo("vgId:1, mnode leader transfer finish");
}
static void mndBecomeFollower(struct SSyncFSM *pFsm) {
static void mndBecomeFollower(const SSyncFSM *pFsm) {
SMnode *pMnode = pFsm->data;
mInfo("vgId:1, become follower");
......@@ -205,7 +197,7 @@ static void mndBecomeFollower(struct SSyncFSM *pFsm) {
taosWUnLockLatch(&pMnode->syncMgmt.lock);
}
static void mndBecomeLeader(struct SSyncFSM *pFsm) {
static void mndBecomeLeader(const SSyncFSM *pFsm) {
mInfo("vgId:1, become leader");
SMnode *pMnode = pFsm->data;
}
......@@ -217,8 +209,8 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
pFsm->FpPreCommitCb = NULL;
pFsm->FpRollBackCb = NULL;
pFsm->FpRestoreFinishCb = mndRestoreFinish;
pFsm->FpLeaderTransferCb = mndLeaderTransfer;
pFsm->FpReConfigCb = mndReConfig;
pFsm->FpLeaderTransferCb = NULL;
pFsm->FpReConfigCb = NULL;
pFsm->FpBecomeLeaderCb = mndBecomeLeader;
pFsm->FpBecomeFollowerCb = mndBecomeFollower;
pFsm->FpGetSnapshot = mndSyncGetSnapshot;
......@@ -242,10 +234,13 @@ int32_t mndInitSync(SMnode *pMnode) {
.batchSize = 1,
.vgId = 1,
.pWal = pMnode->pWal,
.msgcb = NULL,
.FpSendMsg = mndSyncSendMsg,
.FpEqMsg = mndSyncEqMsg,
.FpEqCtrlMsg = mndSyncEqCtrlMsg,
.msgcb = &pMnode->msgCb,
.syncSendMSg = mndSyncSendMsg,
.syncEqMsg = mndSyncEqMsg,
.syncEqCtrlMsg = mndSyncEqCtrlMsg,
.pingMs = 5000,
.electMs = 3000,
.heartbeatMs = 500,
};
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP);
......@@ -269,11 +264,6 @@ int32_t mndInitSync(SMnode *pMnode) {
return -1;
}
// decrease election timer
setPingTimerMS(pMgmt->sync, 5000);
setElectTimerMS(pMgmt->sync, 3000);
setHeartbeatTimerMS(pMgmt->sync, 500);
mInfo("mnode-sync is opened, id:%" PRId64, pMgmt->sync);
return 0;
}
......@@ -289,32 +279,26 @@ void mndCleanupSync(SMnode *pMnode) {
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
SRpcMsg req = {.msgType = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)};
if (req.contLen <= 0) {
terrno = TSDB_CODE_APP_ERROR;
return -1;
}
pMgmt->errCode = 0;
SRpcMsg req = {.msgType = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)};
req.pCont = rpcMallocCont(req.contLen);
if (req.pCont == NULL) return -1;
memcpy(req.pCont, pRaw, req.contLen);
pMgmt->errCode = 0;
taosWLockLatch(&pMgmt->lock);
if (pMgmt->transId != 0) {
mError("trans:%d, can't be proposed since trans:%d alrady waiting for confirm", transId, pMgmt->transId);
mError("trans:%d, can't be proposed since trans:%d already waiting for confirm", transId, pMgmt->transId);
taosWUnLockLatch(&pMgmt->lock);
terrno = TSDB_CODE_APP_NOT_READY;
return -1;
} else {
pMgmt->transId = transId;
mInfo("trans:%d, will be proposed", pMgmt->transId);
taosWUnLockLatch(&pMgmt->lock);
}
const bool isWeak = false;
int32_t code = syncPropose(pMgmt->sync, &req, isWeak);
mInfo("trans:%d, will be proposed", transId);
pMgmt->transId = transId;
taosWUnLockLatch(&pMgmt->lock);
int32_t code = syncPropose(pMgmt->sync, &req, false);
if (code == 0) {
mInfo("trans:%d, is proposing and wait sem", pMgmt->transId);
tsem_wait(&pMgmt->syncSem);
......@@ -327,8 +311,8 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
sdbSetApplyInfo(pMnode->pSdb, req.info.conn.applyIndex, req.info.conn.applyTerm, SYNC_INDEX_INVALID);
code = 0;
} else {
taosWLockLatch(&pMgmt->lock);
mInfo("trans:%d, failed to proposed since %s", transId, terrstr());
taosWLockLatch(&pMgmt->lock);
pMgmt->transId = 0;
taosWUnLockLatch(&pMgmt->lock);
if (terrno == TSDB_CODE_SYN_NOT_LEADER) {
......@@ -344,13 +328,12 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
return code;
}
if (pMgmt->errCode != 0) terrno = pMgmt->errCode;
terrno = pMgmt->errCode;
return pMgmt->errCode;
}
void mndSyncStart(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
syncSetMsgCb(pMgmt->sync, &pMnode->msgCb);
syncStart(pMgmt->sync);
mInfo("vgId:1, sync started, id:%" PRId64, pMgmt->sync);
}
......
......@@ -17,7 +17,6 @@
#define _TD_VND_H_
#include "sync.h"
#include "syncTools.h"
#include "ttrace.h"
#include "vnodeInt.h"
......
......@@ -237,7 +237,7 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
int32_t code = syncProcessMsg(pVnode->sync, pMsg);
if (code != 0) {
vGError("vgId:%d, failed to process sync msg:%p type:%s since %s", pVnode->config.vgId, pMsg,
TMSG_INFO(pMsg->msgType), terrstr());
TMSG_INFO(pMsg->msgType), terrstr());
}
return code;
......@@ -290,78 +290,61 @@ static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
return code;
}
static int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) {
static int32_t vnodeSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot) {
vnodeGetSnapshot(pFsm->data, pSnapshot);
return 0;
}
static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta *cbMeta) {}
static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
if (cbMeta.isWeak == 0) {
SVnode *pVnode = pFsm->data;
static void vnodeSyncApplyMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
SVnode *pVnode = pFsm->data;
if (cbMeta.code == 0) {
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
rpcMsg.info.conn.applyIndex = cbMeta.index;
rpcMsg.info.conn.applyTerm = cbMeta.term;
if (pMeta->code == 0) {
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
rpcMsg.info = pMsg->info;
rpcMsg.info.conn.applyIndex = pMeta->index;
rpcMsg.info.conn.applyTerm = pMeta->term;
vInfo("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64
const STraceId *trace = &pMsg->info.traceId;
vGTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64
", weak:%d, code:%d, state:%d %s, type:%s",
syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.term, rpcMsg.info.conn.applyIndex, cbMeta.isWeak,
cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), TMSG_INFO(pMsg->msgType));
syncGetVgId(pVnode->sync), pFsm, pMeta->index, pMeta->term, rpcMsg.info.conn.applyIndex, pMeta->isWeak,
pMeta->code, pMeta->state, syncUtilState2String(pMeta->state), TMSG_INFO(pMsg->msgType));
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
} else {
SRpcMsg rsp = {.code = cbMeta.code, .info = pMsg->info};
vError("vgId:%d, commit-cb execute error, type:%s, index:%" PRId64 ", error:0x%x %s", syncGetVgId(pVnode->sync),
TMSG_INFO(pMsg->msgType), cbMeta.index, cbMeta.code, tstrerror(cbMeta.code));
if (rsp.info.handle != NULL) {
tmsgSendRsp(&rsp);
}
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
} else {
SRpcMsg rsp = {.code = pMeta->code, .info = pMsg->info};
vError("vgId:%d, commit-cb execute error, type:%s, index:%" PRId64 ", error:0x%x %s", syncGetVgId(pVnode->sync),
TMSG_INFO(pMsg->msgType), pMeta->index, pMeta->code, tstrerror(pMeta->code));
if (rsp.info.handle != NULL) {
tmsgSendRsp(&rsp);
}
}
}
static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
if (cbMeta.isWeak == 1) {
SVnode *pVnode = pFsm->data;
vTrace("vgId:%d, pre-commit-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s",
syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
syncUtilState2String(cbMeta.state), TMSG_INFO(pMsg->msgType));
if (cbMeta.code == 0) {
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
rpcMsg.info.conn.applyIndex = cbMeta.index;
rpcMsg.info.conn.applyTerm = cbMeta.term;
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
} else {
SRpcMsg rsp = {.code = cbMeta.code, .info = pMsg->info};
vError("vgId:%d, pre-commit-cb execute error, type:%s, error:0x%x %s", syncGetVgId(pVnode->sync),
TMSG_INFO(pMsg->msgType), cbMeta.code, tstrerror(cbMeta.code));
if (rsp.info.handle != NULL) {
tmsgSendRsp(&rsp);
}
}
static void vnodeSyncCommitMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
if (pMeta->isWeak == 0) {
vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
}
}
static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
static void vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
if (pMeta->isWeak == 1) {
vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
}
}
static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
SVnode *pVnode = pFsm->data;
vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s",
syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
syncUtilState2String(cbMeta.state), TMSG_INFO(pMsg->msgType));
syncGetVgId(pVnode->sync), pFsm, pMeta->index, pMeta->isWeak, pMeta->code, pMeta->state,
syncUtilState2String(pMeta->state), TMSG_INFO(pMsg->msgType));
}
#define USE_TSDB_SNAPSHOT
static int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) {
static int32_t vnodeSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {
#ifdef USE_TSDB_SNAPSHOT
SVnode *pVnode = pFsm->data;
SSnapshotParam *pSnapshotParam = pParam;
......@@ -373,7 +356,7 @@ static int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void
#endif
}
static int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) {
static int32_t vnodeSnapshotStopRead(const SSyncFSM *pFsm, void *pReader) {
#ifdef USE_TSDB_SNAPSHOT
SVnode *pVnode = pFsm->data;
int32_t code = vnodeSnapReaderClose(pReader);
......@@ -384,7 +367,7 @@ static int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) {
#endif
}
static int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
static int32_t vnodeSnapshotDoRead(const SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
#ifdef USE_TSDB_SNAPSHOT
SVnode *pVnode = pFsm->data;
int32_t code = vnodeSnapRead(pReader, (uint8_t **)ppBuf, len);
......@@ -403,7 +386,7 @@ static int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **
#endif
}
static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void **ppWriter) {
static int32_t vnodeSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void **ppWriter) {
#ifdef USE_TSDB_SNAPSHOT
SVnode *pVnode = pFsm->data;
SSnapshotParam *pSnapshotParam = pParam;
......@@ -427,7 +410,7 @@ static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void
#endif
}
static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
static int32_t vnodeSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
#ifdef USE_TSDB_SNAPSHOT
SVnode *pVnode = pFsm->data;
vInfo("vgId:%d, stop write vnode snapshot, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64,
......@@ -442,7 +425,7 @@ static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool
#endif
}
static int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
static int32_t vnodeSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
#ifdef USE_TSDB_SNAPSHOT
SVnode *pVnode = pFsm->data;
vDebug("vgId:%d, continue write vnode snapshot, len:%d", pVnode->config.vgId, len);
......@@ -454,9 +437,7 @@ static int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *
#endif
}
static void vnodeLeaderTransfer(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {}
static void vnodeRestoreFinish(struct SSyncFSM *pFsm) {
static void vnodeRestoreFinish(const SSyncFSM *pFsm) {
SVnode *pVnode = pFsm->data;
do {
......@@ -476,7 +457,7 @@ static void vnodeRestoreFinish(struct SSyncFSM *pFsm) {
vDebug("vgId:%d, sync restore finished", pVnode->config.vgId);
}
static void vnodeBecomeFollower(struct SSyncFSM *pFsm) {
static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
SVnode *pVnode = pFsm->data;
vDebug("vgId:%d, become follower", pVnode->config.vgId);
......@@ -490,7 +471,7 @@ static void vnodeBecomeFollower(struct SSyncFSM *pFsm) {
taosThreadMutexUnlock(&pVnode->lock);
}
static void vnodeBecomeLeader(struct SSyncFSM *pFsm) {
static void vnodeBecomeLeader(const SSyncFSM *pFsm) {
SVnode *pVnode = pFsm->data;
vDebug("vgId:%d, become leader", pVnode->config.vgId);
......@@ -512,10 +493,10 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshot;
pFsm->FpRestoreFinishCb = vnodeRestoreFinish;
pFsm->FpLeaderTransferCb = vnodeLeaderTransfer;
pFsm->FpLeaderTransferCb = NULL;
pFsm->FpBecomeLeaderCb = vnodeBecomeLeader;
pFsm->FpBecomeFollowerCb = vnodeBecomeFollower;
pFsm->FpReConfigCb = vnodeSyncReconfig;
pFsm->FpReConfigCb = NULL;
pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;
pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead;
pFsm->FpSnapshotDoRead = vnodeSnapshotDoRead;
......@@ -533,10 +514,13 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
.vgId = pVnode->config.vgId,
.syncCfg = pVnode->config.syncCfg,
.pWal = pVnode->pWal,
.msgcb = NULL,
.FpSendMsg = vnodeSyncSendMsg,
.FpEqMsg = vnodeSyncEqMsg,
.FpEqCtrlMsg = vnodeSyncEqCtrlMsg,
.msgcb = &pVnode->msgCb,
.syncSendMSg = vnodeSyncSendMsg,
.syncEqMsg = vnodeSyncEqMsg,
.syncEqCtrlMsg = vnodeSyncEqCtrlMsg,
.pingMs = 5000,
.electMs = 4000,
.heartbeatMs = 700,
};
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP);
......@@ -555,15 +539,11 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
return -1;
}
setPingTimerMS(pVnode->sync, 5000);
setElectTimerMS(pVnode->sync, 4000);
setHeartbeatTimerMS(pVnode->sync, 700);
return 0;
}
void vnodeSyncStart(SVnode *pVnode) {
vDebug("vgId:%d, start sync", pVnode->config.vgId);
syncSetMsgCb(pVnode->sync, &pVnode->msgCb);
syncStart(pVnode->sync);
}
......
......@@ -107,9 +107,9 @@ typedef struct SSyncNode {
// sync io
SWal* pWal;
const SMsgCb* msgcb;
int32_t (*FpSendMsg)(const SEpSet* pEpSet, SRpcMsg* pMsg);
int32_t (*FpEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
int32_t (*FpEqCtrlMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
int32_t (*syncSendMSg)(const SEpSet* pEpSet, SRpcMsg* pMsg);
int32_t (*syncEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
int32_t (*syncEqCtrlMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
// init internal
SNodeInfo myNodeInfo;
......
......@@ -56,6 +56,12 @@ int64_t syncOpen(SSyncInfo* pSyncInfo) {
return -1;
}
pSyncNode->pingBaseLine = pSyncInfo->pingMs;
pSyncNode->pingTimerMS = pSyncInfo->pingMs;
pSyncNode->electBaseLine = pSyncInfo->electMs;
pSyncNode->hbBaseLine = pSyncInfo->heartbeatMs;
pSyncNode->heartbeatTimerMS = pSyncInfo->heartbeatMs;
pSyncNode->msgcb = pSyncInfo->msgcb;
return pSyncNode->rid;
}
......@@ -737,30 +743,7 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
syncNodeRelease(pSyncNode);
}
int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
return TAOS_SYNC_STATE_ERROR;
}
ASSERT(rid == pSyncNode->rid);
SRespStub stub;
int32_t ret = syncRespMgrGet(pSyncNode->pSyncRespMgr, index, &stub);
if (ret == 1) {
memcpy(msg, &(stub.rpcMsg), sizeof(SRpcMsg));
}
syncNodeRelease(pSyncNode);
return ret;
}
int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcHandleInfo* pInfo) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
return TAOS_SYNC_STATE_ERROR;
}
ASSERT(rid == pSyncNode->rid);
static void syncGetAndDelRespRpc(SSyncNode* pSyncNode, uint64_t index, SRpcHandleInfo* pInfo) {
SRespStub stub;
int32_t ret = syncRespMgrGetAndDel(pSyncNode->pSyncRespMgr, index, &stub);
if (ret == 1) {
......@@ -768,20 +751,6 @@ int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcHandleInfo* pInfo)
}
sTrace("vgId:%d, get seq:%" PRIu64 " rpc handle:%p", pSyncNode->vgId, index, pInfo->handle);
syncNodeRelease(pSyncNode);
return ret;
}
void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
sTrace("syncSetQ get pSyncNode is NULL, rid:%" PRId64, rid);
return;
}
ASSERT(rid == pSyncNode->rid);
pSyncNode->msgcb = msgcb;
syncNodeRelease(pSyncNode);
}
char* sync2SimpleStr(int64_t rid) {
......@@ -797,41 +766,6 @@ char* sync2SimpleStr(int64_t rid) {
return s;
}
void setPingTimerMS(int64_t rid, int32_t pingTimerMS) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
return;
}
ASSERT(rid == pSyncNode->rid);
pSyncNode->pingBaseLine = pingTimerMS;
pSyncNode->pingTimerMS = pingTimerMS;
syncNodeRelease(pSyncNode);
}
void setElectTimerMS(int64_t rid, int32_t electTimerMS) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
return;
}
ASSERT(rid == pSyncNode->rid);
pSyncNode->electBaseLine = electTimerMS;
syncNodeRelease(pSyncNode);
}
void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
return;
}
ASSERT(rid == pSyncNode->rid);
pSyncNode->hbBaseLine = hbTimerMS;
pSyncNode->heartbeatTimerMS = hbTimerMS;
syncNodeRelease(pSyncNode);
}
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
......@@ -928,7 +862,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
}
} else {
if (pSyncNode->FpEqMsg != NULL && (*pSyncNode->FpEqMsg)(pSyncNode->msgcb, &rpcMsg) == 0) {
if (pSyncNode->syncEqMsg != NULL && (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg) == 0) {
ret = 0;
} else {
ret = -1;
......@@ -1059,9 +993,9 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
pSyncNode->pWal = pSyncInfo->pWal;
pSyncNode->msgcb = pSyncInfo->msgcb;
pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg;
pSyncNode->FpEqMsg = pSyncInfo->FpEqMsg;
pSyncNode->FpEqCtrlMsg = pSyncInfo->FpEqCtrlMsg;
pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
pSyncNode->syncEqMsg = pSyncInfo->syncEqMsg;
pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg;
// init raft config
pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
......@@ -1577,12 +1511,12 @@ int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
SEpSet epSet;
syncUtilraftId2EpSet(destRaftId, &epSet);
if (pSyncNode->FpSendMsg != NULL) {
if (pSyncNode->syncSendMSg != NULL) {
// htonl
syncUtilMsgHtoN(pMsg->pCont);
pMsg->info.noResp = 1;
pSyncNode->FpSendMsg(&epSet, pMsg);
pSyncNode->syncSendMSg(&epSet, pMsg);
} else {
sError("vgId:%d, sync send msg by id error, fp-send-msg is null", pSyncNode->vgId);
return -1;
......@@ -1594,12 +1528,12 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRp
int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
SEpSet epSet;
syncUtilnodeInfo2EpSet(nodeInfo, &epSet);
if (pSyncNode->FpSendMsg != NULL) {
if (pSyncNode->syncSendMSg != NULL) {
// htonl
syncUtilMsgHtoN(pMsg->pCont);
pMsg->info.noResp = 1;
pSyncNode->FpSendMsg(&epSet, pMsg);
pSyncNode->syncSendMSg(&epSet, pMsg);
} else {
sError("vgId:%d, sync send msg by info error, fp-send-msg is null", pSyncNode->vgId);
}
......@@ -1623,13 +1557,13 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->msgcb);
cJSON_AddStringToObject(pRoot, "rpcClient", u64buf);
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpSendMsg);
cJSON_AddStringToObject(pRoot, "FpSendMsg", u64buf);
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->syncSendMSg);
cJSON_AddStringToObject(pRoot, "syncSendMSg", u64buf);
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->msgcb);
cJSON_AddStringToObject(pRoot, "queue", u64buf);
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpEqMsg);
cJSON_AddStringToObject(pRoot, "FpEqMsg", u64buf);
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->syncEqMsg);
cJSON_AddStringToObject(pRoot, "syncEqMsg", u64buf);
// init internal
cJSON* pMe = syncUtilNodeInfo2Json(&pSyncNode->myNodeInfo);
......@@ -2642,8 +2576,8 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
SRpcMsg rpcMsg;
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
syncRpcMsgLog2((char*)"==syncNodeEqPingTimer==", &rpcMsg);
if (pSyncNode->FpEqMsg != NULL) {
int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
if (pSyncNode->syncEqMsg != NULL) {
int32_t code = pSyncNode->syncEqMsg(pSyncNode->msgcb, &rpcMsg);
if (code != 0) {
sError("vgId:%d, sync enqueue ping msg error, code:%d", pSyncNode->vgId, code);
rpcFreeCont(rpcMsg.pCont);
......@@ -2651,7 +2585,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
return;
}
} else {
sTrace("syncNodeEqPingTimer pSyncNode->FpEqMsg is NULL");
sTrace("syncNodeEqPingTimer pSyncNode->syncEqMsg is NULL");
}
syncTimeoutDestroy(pSyncMsg);
......@@ -2676,8 +2610,8 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
pSyncNode->vgId, pSyncNode);
SRpcMsg rpcMsg;
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
if (pSyncNode->FpEqMsg != NULL && pSyncNode->msgcb != NULL && pSyncNode->msgcb->putToQueueFp != NULL) {
int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
if (pSyncNode->syncEqMsg != NULL && pSyncNode->msgcb != NULL && pSyncNode->msgcb->putToQueueFp != NULL) {
int32_t code = pSyncNode->syncEqMsg(pSyncNode->msgcb, &rpcMsg);
if (code != 0) {
sError("vgId:%d, sync enqueue elect msg error, code:%d", pSyncNode->vgId, code);
rpcFreeCont(rpcMsg.pCont);
......@@ -2693,7 +2627,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
} while (0);
} else {
sTrace("syncNodeEqElectTimer FpEqMsg is NULL");
sTrace("syncNodeEqElectTimer syncEqMsg is NULL");
}
syncTimeoutDestroy(pSyncMsg);
......@@ -2725,8 +2659,8 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
SRpcMsg rpcMsg;
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
syncRpcMsgLog2((char*)"==syncNodeEqHeartbeatTimer==", &rpcMsg);
if (pSyncNode->FpEqMsg != NULL) {
int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
if (pSyncNode->syncEqMsg != NULL) {
int32_t code = pSyncNode->syncEqMsg(pSyncNode->msgcb, &rpcMsg);
if (code != 0) {
sError("vgId:%d, sync enqueue timer msg error, code:%d", pSyncNode->vgId, code);
rpcFreeCont(rpcMsg.pCont);
......@@ -2734,7 +2668,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
return;
}
} else {
sError("vgId:%d, enqueue msg cb ptr (i.e. FpEqMsg) not set.", pSyncNode->vgId);
sError("vgId:%d, enqueue msg cb ptr (i.e. syncEqMsg) not set.", pSyncNode->vgId);
}
syncTimeoutDestroy(pSyncMsg);
......@@ -2781,8 +2715,8 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
// eq msg
#if 0
if (pSyncNode->FpEqCtrlMsg != NULL) {
int32_t code = pSyncNode->FpEqCtrlMsg(pSyncNode->msgcb, &rpcMsg);
if (pSyncNode->syncEqCtrlMsg != NULL) {
int32_t code = pSyncNode->syncEqCtrlMsg(pSyncNode->msgcb, &rpcMsg);
if (code != 0) {
sError("vgId:%d, sync ctrl enqueue timer msg error, code:%d", pSyncNode->vgId, code);
rpcFreeCont(rpcMsg.pCont);
......@@ -2790,7 +2724,7 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
return;
}
} else {
sError("vgId:%d, enqueue ctrl msg cb ptr (i.e. FpEqMsg) not set.", pSyncNode->vgId);
sError("vgId:%d, enqueue ctrl msg cb ptr (i.e. syncEqMsg) not set.", pSyncNode->vgId);
}
#endif
......@@ -2830,10 +2764,10 @@ static int32_t syncNodeEqNoop(SSyncNode* ths) {
SRpcMsg rpcMsg = {0};
syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg);
if (ths->FpEqMsg != NULL) {
ths->FpEqMsg(ths->msgcb, &rpcMsg);
if (ths->syncEqMsg != NULL) {
ths->syncEqMsg(ths->msgcb, &rpcMsg);
} else {
sTrace("syncNodeEqNoop pSyncNode->FpEqMsg is NULL");
sTrace("syncNodeEqNoop pSyncNode->syncEqMsg is NULL");
}
syncEntryDestory(pEntry);
......@@ -2944,8 +2878,8 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) {
SRpcMsg rpcMsgLocalCmd;
syncLocalCmd2RpcMsg(pSyncMsg, &rpcMsgLocalCmd);
if (ths->FpEqMsg != NULL && ths->msgcb != NULL) {
int32_t code = ths->FpEqMsg(ths->msgcb, &rpcMsgLocalCmd);
if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
if (code != 0) {
sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code);
rpcFreeCont(rpcMsgLocalCmd.pCont);
......@@ -3127,17 +3061,18 @@ int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* p
}
if (ths->pFsm->FpLeaderTransferCb != NULL) {
SFsmCbMeta cbMeta = {0};
cbMeta.code = 0;
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
cbMeta.flag = 0;
cbMeta.index = pEntry->index;
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
cbMeta.isWeak = pEntry->isWeak;
cbMeta.seqNum = pEntry->seqNum;
cbMeta.state = ths->state;
cbMeta.term = pEntry->term;
ths->pFsm->FpLeaderTransferCb(ths->pFsm, pRpcMsg, cbMeta);
SFsmCbMeta cbMeta = {
cbMeta.code = 0,
cbMeta.currentTerm = ths->pRaftStore->currentTerm,
cbMeta.flag = 0,
cbMeta.index = pEntry->index,
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index),
cbMeta.isWeak = pEntry->isWeak,
cbMeta.seqNum = pEntry->seqNum,
cbMeta.state = ths->state,
cbMeta.term = pEntry->term,
};
ths->pFsm->FpLeaderTransferCb(ths->pFsm, pRpcMsg, &cbMeta);
}
syncLeaderTransferDestroy(pSyncLeaderTransfer);
......@@ -3315,18 +3250,20 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde
// execute fsm in apply thread, or execute outside syncPropose
if (internalExecute) {
SFsmCbMeta cbMeta = {0};
cbMeta.index = pEntry->index;
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
cbMeta.isWeak = pEntry->isWeak;
cbMeta.code = 0;
cbMeta.state = ths->state;
cbMeta.seqNum = pEntry->seqNum;
cbMeta.term = pEntry->term;
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
cbMeta.flag = flag;
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta);
SFsmCbMeta cbMeta = {
.index = pEntry->index,
.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index),
.isWeak = pEntry->isWeak,
.code = 0,
.state = ths->state,
.seqNum = pEntry->seqNum,
.term = pEntry->term,
.currentTerm = ths->pRaftStore->currentTerm,
.flag = flag,
};
syncGetAndDelRespRpc(ths, cbMeta.seqNum, &rpcMsg.info);
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, &cbMeta);
}
}
......
......@@ -145,16 +145,17 @@ void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
taosArrayPush(delIndexArray, pSeqNum);
cnt++;
SFsmCbMeta cbMeta = {0};
cbMeta.index = SYNC_INDEX_INVALID;
cbMeta.lastConfigIndex = SYNC_INDEX_INVALID;
cbMeta.isWeak = false;
cbMeta.code = TSDB_CODE_SYN_TIMEOUT;
cbMeta.state = pSyncNode->state;
cbMeta.seqNum = *pSeqNum;
cbMeta.term = SYNC_TERM_INVALID;
cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm;
cbMeta.flag = 0;
SFsmCbMeta cbMeta = {
cbMeta.index = SYNC_INDEX_INVALID,
cbMeta.lastConfigIndex = SYNC_INDEX_INVALID,
cbMeta.isWeak = false,
cbMeta.code = TSDB_CODE_SYN_TIMEOUT,
cbMeta.state = pSyncNode->state,
cbMeta.seqNum = *pSeqNum,
cbMeta.term = SYNC_TERM_INVALID,
cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm,
cbMeta.flag = 0,
};
pStub->rpcMsg.pCont = NULL;
pStub->rpcMsg.contLen = 0;
......
......@@ -177,18 +177,6 @@ char* syncUtilRaftId2Str(const SRaftId* p) {
}
const char* syncUtilState2String(ESyncState state) {
/*
if (state == TAOS_SYNC_STATE_FOLLOWER) {
return "TAOS_SYNC_STATE_FOLLOWER";
} else if (state == TAOS_SYNC_STATE_CANDIDATE) {
return "TAOS_SYNC_STATE_CANDIDATE";
} else if (state == TAOS_SYNC_STATE_LEADER) {
return "TAOS_SYNC_STATE_LEADER";
} else {
return "TAOS_SYNC_STATE_UNKNOWN";
}
*/
if (state == TAOS_SYNC_STATE_FOLLOWER) {
return "follower";
} else if (state == TAOS_SYNC_STATE_CANDIDATE) {
......
......@@ -195,8 +195,8 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal*
SSyncInfo syncInfo;
syncInfo.vgId = vgId;
syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = createFsm();
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex);
syncInfo.pWal = pWal;
......
......@@ -120,8 +120,8 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal*
SSyncInfo syncInfo;
syncInfo.vgId = vgId;
syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = createFsm();
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex);
syncInfo.pWal = pWal;
......
......@@ -45,8 +45,8 @@ SSyncNode* createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWa
SSyncInfo syncInfo;
syncInfo.vgId = vgId;
syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = NULL;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex);
syncInfo.pWal = pWal;
......
......@@ -32,8 +32,8 @@ SSyncNode *pSyncNode;
SSyncNode *syncNodeInit() {
syncInfo.vgId = 1234;
syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
......
......@@ -25,8 +25,8 @@ SSyncFSM* pFsm;
SSyncNode* syncNodeInit() {
syncInfo.vgId = 1234;
syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
......@@ -97,7 +97,7 @@ int main(int argc, char** argv) {
SyncPingReply* pSyncMsg = syncPingReplyBuild2(&pSyncNode->myRaftId, &pSyncNode->myRaftId, 1000, "syncEnqTest");
SRpcMsg rpcMsg;
syncPingReply2RpcMsg(pSyncMsg, &rpcMsg);
pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
pSyncNode->syncEqMsg(pSyncNode->msgcb, &rpcMsg);
taosMsleep(1000);
}
......
......@@ -26,8 +26,8 @@ SSyncFSM* pFsm;
SSyncNode* syncNodeInit() {
syncInfo.vgId = 1234;
syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
......@@ -103,7 +103,7 @@ int main(int argc, char** argv) {
SEpSet epSet;
syncUtilnodeInfo2EpSet(&pSyncNode->myNodeInfo, &epSet);
rpcMsg.info.noResp = 1;
pSyncNode->FpSendMsg(&epSet, &rpcMsg);
pSyncNode->syncSendMSg(&epSet, &rpcMsg);
taosMsleep(1000);
}
......
......@@ -26,8 +26,8 @@ SSyncFSM* pFsm;
SSyncNode* syncNodeInit() {
syncInfo.vgId = 1234;
syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./sync_init_test");
......
......@@ -26,8 +26,8 @@ SSyncFSM* pFsm;
SSyncNode* syncNodeInit() {
syncInfo.vgId = 1234;
syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
......
......@@ -26,8 +26,8 @@ SSyncFSM* pFsm;
SSyncNode* syncNodeInit() {
syncInfo.vgId = 1234;
syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
......
......@@ -26,8 +26,8 @@ SSyncFSM* pFsm;
SSyncNode* syncNodeInit() {
syncInfo.vgId = 1234;
syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
......
......@@ -100,8 +100,8 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal*
SSyncInfo syncInfo;
syncInfo.vgId = vgId;
syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = createFsm();
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex);
syncInfo.pWal = pWal;
......
......@@ -87,8 +87,8 @@ void initFsm() {
SSyncNode *syncNodeInit() {
syncInfo.vgId = 1234;
syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", pDir);
......@@ -204,7 +204,7 @@ int main(int argc, char **argv) {
SyncClientRequest *pSyncClientRequest = pMsg1;
SRpcMsg rpcMsg;
syncClientRequest2RpcMsg(pSyncClientRequest, &rpcMsg);
gSyncNode->FpEqMsg(gSyncNode->msgcb, &rpcMsg);
gSyncNode->syncEqMsg(gSyncNode->msgcb, &rpcMsg);
taosMsleep(1000);
}
......
......@@ -217,8 +217,8 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal*
SSyncInfo syncInfo;
syncInfo.vgId = vgId;
syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = createFsm();
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex);
syncInfo.pWal = pWal;
......
......@@ -28,8 +28,8 @@ SSyncNode* pSyncNode;
SSyncNode* syncNodeInit() {
syncInfo.vgId = 1234;
syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
......
......@@ -28,8 +28,8 @@ SSyncNode* pSyncNode;
SSyncNode* syncNodeInit() {
syncInfo.vgId = 1234;
syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
......
......@@ -65,8 +65,8 @@ void initFsm() {
SSyncNode *syncNodeInit() {
syncInfo.vgId = 1234;
syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", pDir);
......@@ -179,7 +179,7 @@ int main(int argc, char **argv) {
SyncClientRequest *pSyncClientRequest = pMsg1;
SRpcMsg rpcMsg;
syncClientRequest2RpcMsg(pSyncClientRequest, &rpcMsg);
gSyncNode->FpEqMsg(gSyncNode->msgcb, &rpcMsg);
gSyncNode->syncEqMsg(gSyncNode->msgcb, &rpcMsg);
taosMsleep(1000);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册