未验证 提交 49954b32 编写于 作者: L Li Minghao 提交者: GitHub

Merge pull request #14730 from taosdata/feature/3.0_mhli

fix(sync): vnode snapshot sender
...@@ -450,42 +450,81 @@ static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta ...@@ -450,42 +450,81 @@ static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta
syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType)); syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType));
} }
#define USE_TSDB_SNAPSHOT
static int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) { static int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) {
#ifdef USE_TSDB_SNAPSHOT
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
SSnapshotParam *pSnapshotParam = pParam; SSnapshotParam *pSnapshotParam = pParam;
int32_t code = vnodeSnapReaderOpen(pVnode, pSnapshotParam->start, pSnapshotParam->end, (SVSnapReader **)ppReader); int32_t code = vnodeSnapReaderOpen(pVnode, pSnapshotParam->start, pSnapshotParam->end, (SVSnapReader **)ppReader);
return code; return code;
#else
*ppReader = taosMemoryMalloc(32);
return 0;
#endif
} }
static int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { static int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) {
#ifdef USE_TSDB_SNAPSHOT
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
int32_t code = vnodeSnapReaderClose(pReader); int32_t code = vnodeSnapReaderClose(pReader);
return code; return code;
#else
taosMemoryFree(pReader);
return 0;
#endif
} }
static int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { static int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
#ifdef USE_TSDB_SNAPSHOT
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
int32_t code = vnodeSnapRead(pReader, (uint8_t **)ppBuf, len); int32_t code = vnodeSnapRead(pReader, (uint8_t **)ppBuf, len);
return code; return code;
#else
static int32_t times = 0;
if (times++ < 5) {
*len = 64;
*ppBuf = taosMemoryMalloc(*len);
snprintf(*ppBuf, *len, "snapshot block %d", times);
} else {
*len = 0;
*ppBuf = NULL;
}
return 0;
#endif
} }
static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void **ppWriter) { static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void **ppWriter) {
#ifdef USE_TSDB_SNAPSHOT
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
SSnapshotParam *pSnapshotParam = pParam; SSnapshotParam *pSnapshotParam = pParam;
int32_t code = vnodeSnapWriterOpen(pVnode, pSnapshotParam->start, pSnapshotParam->end, (SVSnapWriter **)ppWriter); int32_t code = vnodeSnapWriterOpen(pVnode, pSnapshotParam->start, pSnapshotParam->end, (SVSnapWriter **)ppWriter);
return code; return code;
#else
*ppWriter = taosMemoryMalloc(32);
return 0;
#endif
} }
static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply) { static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply) {
#ifdef USE_TSDB_SNAPSHOT
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
int32_t code = vnodeSnapWriterClose(pWriter, !isApply); int32_t code = vnodeSnapWriterClose(pWriter, !isApply);
return code; return code;
#else
taosMemoryFree(pWriter);
return 0;
#endif
} }
static int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) { static int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
#ifdef USE_TSDB_SNAPSHOT
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
int32_t code = vnodeSnapWrite(pWriter, pBuf, len); int32_t code = vnodeSnapWrite(pWriter, pBuf, len);
return code; return code;
#else
return 0;
#endif
} }
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
...@@ -509,6 +548,7 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { ...@@ -509,6 +548,7 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
SSyncInfo syncInfo = { SSyncInfo syncInfo = {
//.snapshotStrategy = SYNC_STRATEGY_WAL_FIRST,
.snapshotStrategy = SYNC_STRATEGY_NO_SNAPSHOT, .snapshotStrategy = SYNC_STRATEGY_NO_SNAPSHOT,
.batchSize = 10, .batchSize = 10,
.vgId = pVnode->config.vgId, .vgId = pVnode->config.vgId,
......
...@@ -235,10 +235,10 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie ...@@ -235,10 +235,10 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie
// do nothing // do nothing
} else { } else {
SSyncRaftEntry* pEntry; SSnapshot oldSnapshot;
int32_t code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, nextIndex, &pEntry); ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &oldSnapshot);
ASSERT(code == 0); SyncTerm newSnapshotTerm = oldSnapshot.lastApplyTerm;
syncNodeStartSnapshotOnce(ths, SYNC_INDEX_BEGIN, nextIndex, pEntry->term, pMsg); syncNodeStartSnapshotOnce(ths, SYNC_INDEX_BEGIN, nextIndex, newSnapshotTerm, pMsg);
// get sender // get sender
SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId)); SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId));
......
system sh/stop_dnodes.sh system sh/stop_dnodes.sh
system sh/cfg.sh -n dnode1 -c supportVnodes -v 0 #system sh/cfg.sh -n dnode1 -c supportVnodes -v 0
system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start system sh/exec.sh -n dnode2 -s start
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册