提交 b6aea6ae 编写于 作者: M Minghao Li

fix(sync): when apply queue not empty, can not read

上级 28377a9f
...@@ -138,6 +138,7 @@ typedef struct SSyncFSM { ...@@ -138,6 +138,7 @@ typedef struct SSyncFSM {
void (*FpRestoreFinishCb)(const struct SSyncFSM* pFsm); void (*FpRestoreFinishCb)(const struct SSyncFSM* pFsm);
void (*FpReConfigCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SReConfigCbMeta* pMeta); void (*FpReConfigCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SReConfigCbMeta* pMeta);
void (*FpLeaderTransferCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SFsmCbMeta* pMeta); void (*FpLeaderTransferCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
bool (*FpApplyQueueEmptyCb)(const struct SSyncFSM* pFsm);
void (*FpBecomeLeaderCb)(const struct SSyncFSM* pFsm); void (*FpBecomeLeaderCb)(const struct SSyncFSM* pFsm);
void (*FpBecomeFollowerCb)(const struct SSyncFSM* pFsm); void (*FpBecomeFollowerCb)(const struct SSyncFSM* pFsm);
......
...@@ -202,6 +202,13 @@ static void mndBecomeLeader(const SSyncFSM *pFsm) { ...@@ -202,6 +202,13 @@ static void mndBecomeLeader(const SSyncFSM *pFsm) {
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
} }
static bool mndApplyQueueEmpty(const SSyncFSM *pFsm) {
SMnode *pMnode = pFsm->data;
int32_t itemSize = tmsgGetQueueSize(&pMnode->msgCb, 1, APPLY_QUEUE);
return (itemSize == 0);
}
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM)); SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
pFsm->data = pMnode; pFsm->data = pMnode;
...@@ -210,6 +217,7 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { ...@@ -210,6 +217,7 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
pFsm->FpRollBackCb = NULL; pFsm->FpRollBackCb = NULL;
pFsm->FpRestoreFinishCb = mndRestoreFinish; pFsm->FpRestoreFinishCb = mndRestoreFinish;
pFsm->FpLeaderTransferCb = NULL; pFsm->FpLeaderTransferCb = NULL;
pFsm->FpApplyQueueEmptyCb = mndApplyQueueEmpty;
pFsm->FpReConfigCb = NULL; pFsm->FpReConfigCb = NULL;
pFsm->FpBecomeLeaderCb = mndBecomeLeader; pFsm->FpBecomeLeaderCb = mndBecomeLeader;
pFsm->FpBecomeFollowerCb = mndBecomeFollower; pFsm->FpBecomeFollowerCb = mndBecomeFollower;
......
...@@ -436,6 +436,12 @@ static void vnodeBecomeLeader(const SSyncFSM *pFsm) { ...@@ -436,6 +436,12 @@ static void vnodeBecomeLeader(const SSyncFSM *pFsm) {
vDebug("vgId:%d, become leader", pVnode->config.vgId); vDebug("vgId:%d, become leader", pVnode->config.vgId);
} }
static bool vnodeApplyQueueEmpty(const SSyncFSM *pFsm) {
SVnode *pVnode = pFsm->data;
int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
return (itemSize == 0);
}
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM)); SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
pFsm->data = pVnode; pFsm->data = pVnode;
...@@ -445,6 +451,7 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { ...@@ -445,6 +451,7 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshot; pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshot;
pFsm->FpRestoreFinishCb = vnodeRestoreFinish; pFsm->FpRestoreFinishCb = vnodeRestoreFinish;
pFsm->FpLeaderTransferCb = NULL; pFsm->FpLeaderTransferCb = NULL;
pFsm->FpApplyQueueEmptyCb = vnodeApplyQueueEmpty;
pFsm->FpBecomeLeaderCb = vnodeBecomeLeader; pFsm->FpBecomeLeaderCb = vnodeBecomeLeader;
pFsm->FpBecomeFollowerCb = vnodeBecomeFollower; pFsm->FpBecomeFollowerCb = vnodeBecomeFollower;
pFsm->FpReConfigCb = NULL; pFsm->FpReConfigCb = NULL;
......
...@@ -452,16 +452,22 @@ bool syncIsReadyForRead(int64_t rid) { ...@@ -452,16 +452,22 @@ bool syncIsReadyForRead(int64_t rid) {
bool ready = false; bool ready = false;
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && !pSyncNode->restoreFinish) { if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && !pSyncNode->restoreFinish) {
if (!pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore)) { if (!pSyncNode->pFsm->FpApplyQueueEmptyCb(pSyncNode->pFsm)) {
SSyncRaftEntry* pEntry = NULL; // apply queue not empty
int32_t code = pSyncNode->pLogStore->syncLogGetEntry( ready = false;
pSyncNode->pLogStore, pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore), &pEntry);
if (code == 0 && pEntry != NULL) { } else {
if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->pRaftStore->currentTerm) { if (!pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore)) {
ready = true; SSyncRaftEntry* pEntry = NULL;
} int32_t code = pSyncNode->pLogStore->syncLogGetEntry(
pSyncNode->pLogStore, pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore), &pEntry);
if (code == 0 && pEntry != NULL) {
if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->pRaftStore->currentTerm) {
ready = true;
}
syncEntryDestory(pEntry); syncEntryDestory(pEntry);
}
} }
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册