提交 b59bee66 编写于 作者: S Shengliang Guan

fix: restart snapshot sender on receiver is restart

上级 773423a6
...@@ -112,7 +112,7 @@ SyncTerm syncLogReplMgrGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, S ...@@ -112,7 +112,7 @@ SyncTerm syncLogReplMgrGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, S
return prevLogTerm; return prevLogTerm;
} }
sError("vgId:%d, failed to get log term since %s. index: %" PRId64 "", pNode->vgId, terrstr(), prevIndex); sInfo("vgId:%d, failed to get log term since %s. index:%" PRId64, pNode->vgId, terrstr(), prevIndex);
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
return -1; return -1;
} }
......
...@@ -115,8 +115,8 @@ static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncI ...@@ -115,8 +115,8 @@ static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncI
const char* sysErrStr = strerror(errno); const char* sysErrStr = strerror(errno);
sNError(pData->pSyncNode, sNError(pData->pSyncNode,
"wal restore from snapshot error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", "wal restore from snapshot error, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s", snapshotIndex,
snapshotIndex, err, err, errStr, sysErr, sysErrStr); err, errStr, sysErr, sysErrStr);
return -1; return -1;
} }
...@@ -212,8 +212,8 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr ...@@ -212,8 +212,8 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
int32_t sysErr = errno; int32_t sysErr = errno;
const char* sysErrStr = strerror(errno); const char* sysErrStr = strerror(errno);
sNError(pData->pSyncNode, "wal write error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", sNError(pData->pSyncNode, "wal write error, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s",
pEntry->index, err, err, errStr, sysErr, sysErrStr); pEntry->index, err, errStr, sysErr, sysErrStr);
return -1; return -1;
} }
...@@ -257,11 +257,11 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR ...@@ -257,11 +257,11 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR
const char* sysErrStr = strerror(errno); const char* sysErrStr = strerror(errno);
if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) { if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
sNTrace(pData->pSyncNode, "wal read not exist, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", index, sNTrace(pData->pSyncNode, "wal read not exist, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s", index,
err, err, errStr, sysErr, sysErrStr); err, errStr, sysErr, sysErrStr);
} else { } else {
sNTrace(pData->pSyncNode, "wal read error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", index, sNTrace(pData->pSyncNode, "wal read error, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s", index, err,
err, err, errStr, sysErr, sysErrStr); errStr, sysErr, sysErrStr);
} }
/* /*
...@@ -341,8 +341,8 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn ...@@ -341,8 +341,8 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn
const char* errStr = tstrerror(err); const char* errStr = tstrerror(err);
int32_t sysErr = errno; int32_t sysErr = errno;
const char* sysErrStr = strerror(errno); const char* sysErrStr = strerror(errno);
sError("vgId:%d, wal truncate error, from-index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", sError("vgId:%d, wal truncate error, from-index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s",
pData->pSyncNode->vgId, fromIndex, err, err, errStr, sysErr, sysErrStr); pData->pSyncNode->vgId, fromIndex, err, errStr, sysErr, sysErrStr);
} }
// event log // event log
...@@ -392,8 +392,8 @@ int32_t raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) { ...@@ -392,8 +392,8 @@ int32_t raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
const char* errStr = tstrerror(err); const char* errStr = tstrerror(err);
int32_t sysErr = errno; int32_t sysErr = errno;
const char* sysErrStr = strerror(errno); const char* sysErrStr = strerror(errno);
sError("vgId:%d, wal update commit index error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", sError("vgId:%d, wal update commit index error, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s",
pData->pSyncNode->vgId, index, err, err, errStr, sysErr, sysErrStr); pData->pSyncNode->vgId, index, err, errStr, sysErr, sysErrStr);
return -1; return -1;
} }
return 0; return 0;
......
...@@ -747,7 +747,7 @@ static int32_t syncNodeOnSnapshotTransfering(SSyncNode *pSyncNode, SyncSnapshotS ...@@ -747,7 +747,7 @@ static int32_t syncNodeOnSnapshotTransfering(SSyncNode *pSyncNode, SyncSnapshotS
pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start; pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
// send msg // send msg
syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver receiving"); syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver received");
if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) { if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) {
sRError(pReceiver, "snapshot receiver send resp failed since %s", terrstr()); sRError(pReceiver, "snapshot receiver send resp failed since %s", terrstr());
return -1; return -1;
...@@ -979,32 +979,31 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { ...@@ -979,32 +979,31 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
return -1; return -1;
} }
if (pMsg->startTime != pSender->startTime) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "sender:% " PRId64 " receiver:%" PRId64 " time not match");
return -1;
}
// state, term, seq/ack // state, term, seq/ack
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender not leader");
sSError(pSender, "snapshot sender not leader"); sSError(pSender, "snapshot sender not leader");
return -1; goto _ERROR;
}
if (pMsg->startTime != pSender->startTime) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender and receiver time not match");
sSError(pSender, "sender:%" PRId64 " receiver:%" PRId64 " time not match, code:0x%x", pMsg->startTime,
pSender->startTime, pMsg->code);
goto _ERROR;
} }
if (pMsg->term != pSyncNode->pRaftStore->currentTerm) { if (pMsg->term != pSyncNode->pRaftStore->currentTerm) {
sSError(pSender, "snapshot sender term not equal"); syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender and receiver term not match");
return -1; sSError(pSender, "snapshot sender term not equal, msg term:%" PRId64 " currentTerm:%" PRId64, pMsg->term,
pSyncNode->pRaftStore->currentTerm);
goto _ERROR;
} }
if (pMsg->code != 0) { if (pMsg->code != 0) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "receive error code"); syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "receive error code");
sSError(pSender, "snapshot sender receive error code:0x%x and stop sender", pMsg->code); sSError(pSender, "snapshot sender receive error code:0x%x and stop sender", pMsg->code);
snapshotSenderStop(pSender, true); goto _ERROR;
SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId);
if (pMgr) {
syncLogReplMgrReset(pMgr);
}
return -1;
} }
// prepare <begin, end>, send begin msg // prepare <begin, end>, send begin msg
...@@ -1068,4 +1067,14 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { ...@@ -1068,4 +1067,14 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
} }
return 0; return 0;
_ERROR:
snapshotSenderStop(pSender, true);
SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId);
if (pMgr) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "reset repl mgr");
syncLogReplMgrReset(pMgr);
}
return -1;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册