提交 d672e7a0 编写于 作者: M Minghao Li

refactor(sync): add FpApplyQueueItems in fsm

上级 0656cd8f
......@@ -139,6 +139,7 @@ typedef struct SSyncFSM {
void (*FpReConfigCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SReConfigCbMeta* pMeta);
void (*FpLeaderTransferCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
bool (*FpApplyQueueEmptyCb)(const struct SSyncFSM* pFsm);
int32_t (*FpApplyQueueItems)(const struct SSyncFSM* pFsm);
void (*FpBecomeLeaderCb)(const struct SSyncFSM* pFsm);
void (*FpBecomeFollowerCb)(const struct SSyncFSM* pFsm);
......
......@@ -205,8 +205,23 @@ static void mndBecomeLeader(const SSyncFSM *pFsm) {
static bool mndApplyQueueEmpty(const SSyncFSM *pFsm) {
SMnode *pMnode = pFsm->data;
int32_t itemSize = tmsgGetQueueSize(&pMnode->msgCb, 1, APPLY_QUEUE);
return (itemSize == 0);
if (pMnode != NULL && pMnode->msgCb.qsizeFp != NULL) {
int32_t itemSize = tmsgGetQueueSize(&pMnode->msgCb, 1, APPLY_QUEUE);
return (itemSize == 0);
} else {
return true;
}
}
static int32_t mndApplyQueueItems(const SSyncFSM *pFsm) {
SMnode *pMnode = pFsm->data;
if (pMnode != NULL && pMnode->msgCb.qsizeFp != NULL) {
int32_t itemSize = tmsgGetQueueSize(&pMnode->msgCb, 1, APPLY_QUEUE);
return itemSize;
} else {
return -1;
}
}
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
......@@ -218,6 +233,7 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
pFsm->FpRestoreFinishCb = mndRestoreFinish;
pFsm->FpLeaderTransferCb = NULL;
pFsm->FpApplyQueueEmptyCb = mndApplyQueueEmpty;
pFsm->FpApplyQueueItems = mndApplyQueueItems;
pFsm->FpReConfigCb = NULL;
pFsm->FpBecomeLeaderCb = mndBecomeLeader;
pFsm->FpBecomeFollowerCb = mndBecomeFollower;
......@@ -291,7 +307,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
SRpcMsg req = {.msgType = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)};
if (req.contLen <= 0) return -1;
req.pCont = rpcMallocCont(req.contLen);
if (req.pCont == NULL) return -1;
memcpy(req.pCont, pRaw, req.contLen);
......
......@@ -438,8 +438,24 @@ static void vnodeBecomeLeader(const SSyncFSM *pFsm) {
static bool vnodeApplyQueueEmpty(const SSyncFSM *pFsm) {
SVnode *pVnode = pFsm->data;
int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
return (itemSize == 0);
if (pVnode != NULL && pVnode->msgCb.qsizeFp != NULL) {
int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
return (itemSize == 0);
} else {
return true;
}
}
static int32_t vnodeApplyQueueItems(const SSyncFSM *pFsm) {
SVnode *pVnode = pFsm->data;
if (pVnode != NULL && pVnode->msgCb.qsizeFp != NULL) {
int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
return itemSize;
} else {
return -1;
}
}
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
......@@ -452,6 +468,7 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
pFsm->FpRestoreFinishCb = vnodeRestoreFinish;
pFsm->FpLeaderTransferCb = NULL;
pFsm->FpApplyQueueEmptyCb = vnodeApplyQueueEmpty;
pFsm->FpApplyQueueItems = vnodeApplyQueueItems;
pFsm->FpBecomeLeaderCb = vnodeBecomeLeader;
pFsm->FpBecomeFollowerCb = vnodeBecomeFollower;
pFsm->FpReConfigCb = NULL;
......
此差异已折叠。
......@@ -205,7 +205,7 @@ static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
if (i < pSyncNode->replicaNum - 1) {
len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 " %" PRId64 ", ", i, pState->lastSendIndex,
pState->lastSendTime);
pState->lastSendTime);
} else {
len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 " %" PRId64 "}", i, pState->lastSendIndex,
pState->lastSendTime);
......@@ -242,6 +242,8 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
int32_t writeLen = vsnprintf(eventLog, sizeof(eventLog), format, argpointer);
va_end(argpointer);
// int32_t aq = pNode->pFsm->FpApplyQueueItems(pNode->pFsm);
taosPrintLog(flags, level, dflag,
"vgId:%d, sync %s "
"%s"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册