未验证 提交 57a0fcd8 编写于 作者: L Li Minghao 提交者: GitHub

Merge pull request #15720 from taosdata/feature/3.0_mhli

refactor(sync): change wal fsync flag to false
...@@ -149,7 +149,7 @@ int32_t mndSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void **ppWrit ...@@ -149,7 +149,7 @@ int32_t mndSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void **ppWrit
int32_t mndSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) { int32_t mndSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
mInfo("stop to apply snapshot to sdb, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64, isApply, mInfo("stop to apply snapshot to sdb, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64, isApply,
pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastApplyIndex); pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
return sdbStopWrite(pMnode->pSdb, pWriter, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, return sdbStopWrite(pMnode->pSdb, pWriter, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm,
pSnapshot->lastConfigIndex); pSnapshot->lastConfigIndex);
......
...@@ -630,7 +630,7 @@ static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool ...@@ -630,7 +630,7 @@ static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool
#ifdef USE_TSDB_SNAPSHOT #ifdef USE_TSDB_SNAPSHOT
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
vInfo("vgId:%d, stop write vnode snapshot, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64, pVnode->config.vgId, isApply, vInfo("vgId:%d, stop write vnode snapshot, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64, pVnode->config.vgId, isApply,
pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastApplyIndex); pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot); int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot);
vInfo("vgId:%d, apply vnode snapshot finished, code:0x%x", pVnode->config.vgId, code); vInfo("vgId:%d, apply vnode snapshot finished, code:0x%x", pVnode->config.vgId, code);
......
...@@ -573,7 +573,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc ...@@ -573,7 +573,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
// fsync once // fsync once
SSyncLogStoreData* pData = ths->pLogStore->data; SSyncLogStoreData* pData = ths->pLogStore->data;
SWal* pWal = pData->pWal; SWal* pWal = pData->pWal;
walFsync(pWal, true); walFsync(pWal, false);
// update match index // update match index
matchIndex = pMsg->prevLogIndex + pMsg->dataCount; matchIndex = pMsg->prevLogIndex + pMsg->dataCount;
...@@ -694,7 +694,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc ...@@ -694,7 +694,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
// fsync once // fsync once
SSyncLogStoreData* pData = ths->pLogStore->data; SSyncLogStoreData* pData = ths->pLogStore->data;
SWal* pWal = pData->pWal; SWal* pWal = pData->pWal;
walFsync(pWal, true); walFsync(pWal, false);
} }
// prepare response msg // prepare response msg
......
...@@ -206,7 +206,7 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr ...@@ -206,7 +206,7 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
SWal* pWal = pData->pWal; SWal* pWal = pData->pWal;
SyncIndex index = 0; SyncIndex index = 0;
SWalSyncInfo syncMeta; SWalSyncInfo syncMeta = {0};
syncMeta.isWeek = pEntry->isWeak; syncMeta.isWeek = pEntry->isWeak;
syncMeta.seqNum = pEntry->seqNum; syncMeta.seqNum = pEntry->seqNum;
syncMeta.term = pEntry->term; syncMeta.term = pEntry->term;
...@@ -444,7 +444,7 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { ...@@ -444,7 +444,7 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
SWal* pWal = pData->pWal; SWal* pWal = pData->pWal;
SyncIndex index = 0; SyncIndex index = 0;
SWalSyncInfo syncMeta; SWalSyncInfo syncMeta = {0};
syncMeta.isWeek = pEntry->isWeak; syncMeta.isWeek = pEntry->isWeak;
syncMeta.seqNum = pEntry->seqNum; syncMeta.seqNum = pEntry->seqNum;
syncMeta.term = pEntry->term; syncMeta.term = pEntry->term;
......
...@@ -132,8 +132,9 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) { ...@@ -132,8 +132,9 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) {
SyncIndex preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex); SyncIndex preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex);
SyncTerm preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex); SyncTerm preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex);
if (preLogTerm == SYNC_TERM_INVALID) { if (preLogTerm == SYNC_TERM_INVALID) {
// SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1; SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1;
SyncIndex newNextIndex = nextIndex + 1; // SyncIndex newNextIndex = nextIndex + 1;
syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, newNextIndex); syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, newNextIndex);
syncIndexMgrSetIndex(pSyncNode->pMatchIndex, pDestId, SYNC_INDEX_INVALID); syncIndexMgrSetIndex(pSyncNode->pMatchIndex, pDestId, SYNC_INDEX_INVALID);
sError("vgId:%d, sync get pre term error, nextIndex:%" PRId64 ", update next-index:%" PRId64 sError("vgId:%d, sync get pre term error, nextIndex:%" PRId64 ", update next-index:%" PRId64
...@@ -224,8 +225,9 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) { ...@@ -224,8 +225,9 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
SyncIndex preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex); SyncIndex preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex);
SyncTerm preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex); SyncTerm preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex);
if (preLogTerm == SYNC_TERM_INVALID) { if (preLogTerm == SYNC_TERM_INVALID) {
// SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1; SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1;
SyncIndex newNextIndex = nextIndex + 1; // SyncIndex newNextIndex = nextIndex + 1;
syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, newNextIndex); syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, newNextIndex);
syncIndexMgrSetIndex(pSyncNode->pMatchIndex, pDestId, SYNC_INDEX_INVALID); syncIndexMgrSetIndex(pSyncNode->pMatchIndex, pDestId, SYNC_INDEX_INVALID);
sError("vgId:%d, sync get pre term error, nextIndex:%" PRId64 ", update next-index:%" PRId64 sError("vgId:%d, sync get pre term error, nextIndex:%" PRId64 ", update next-index:%" PRId64
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/deploy.sh -n dnode4 -i 4
system sh/cfg.sh -n dnode1 -c supportVnodes -v 0
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
system sh/exec.sh -n dnode4 -s start
sql connect
sql create dnode $hostname port 7200
sql create dnode $hostname port 7300
sql create dnode $hostname port 7400
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册