提交 48b6bd43 编写于 作者: S Shengliang Guan

fix: restart snapshot sender on receiver is restart

上级 053f48e3
...@@ -517,6 +517,7 @@ int32_t* taosGetErrno(); ...@@ -517,6 +517,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_SYN_STANDBY_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0912) #define TSDB_CODE_SYN_STANDBY_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0912)
#define TSDB_CODE_SYN_BATCH_ERROR TAOS_DEF_ERROR_CODE(0, 0x0913) #define TSDB_CODE_SYN_BATCH_ERROR TAOS_DEF_ERROR_CODE(0, 0x0913)
#define TSDB_CODE_SYN_RESTORING TAOS_DEF_ERROR_CODE(0, 0x0914) #define TSDB_CODE_SYN_RESTORING TAOS_DEF_ERROR_CODE(0, 0x0914)
#define TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG TAOS_DEF_ERROR_CODE(0, 0x0915) // internal
#define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF) #define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF)
// tq // tq
......
...@@ -234,10 +234,11 @@ int vnodeAsyncCommit(SVnode *pVnode) { ...@@ -234,10 +234,11 @@ int vnodeAsyncCommit(SVnode *pVnode) {
_exit: _exit:
if (code) { if (code) {
vError("vgId:%d, %s failed since %s, commit id:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code), vError("vgId:%d, vnode async commit failed since %s, commitId:%" PRId64, TD_VID(pVnode), tstrerror(code),
pVnode->state.commitID); pVnode->state.commitID);
} else { } else {
vDebug("vgId:%d, %s done", TD_VID(pVnode), __func__); vInfo("vgId:%d, vnode async commit done, commitId:%" PRId64 " term:%" PRId64 " applied:%" PRId64, TD_VID(pVnode),
pVnode->state.commitID, pVnode->state.applyTerm, pVnode->state.applied);
} }
return code; return code;
} }
...@@ -256,7 +257,7 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) { ...@@ -256,7 +257,7 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) {
char dir[TSDB_FILENAME_LEN] = {0}; char dir[TSDB_FILENAME_LEN] = {0};
SVnode *pVnode = pInfo->pVnode; SVnode *pVnode = pInfo->pVnode;
vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64 " term: %" PRId64, TD_VID(pVnode), vInfo("vgId:%d, start to commit, commitId:%" PRId64 " version:%" PRId64 " term: %" PRId64, TD_VID(pVnode),
pVnode->state.commitID, pVnode->state.applied, pVnode->state.applyTerm); pVnode->state.commitID, pVnode->state.applied, pVnode->state.applyTerm);
// persist wal before starting // persist wal before starting
......
...@@ -423,7 +423,7 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { ...@@ -423,7 +423,7 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
ASSERT(pHdr->index == pWriter->index + 1); ASSERT(pHdr->index == pWriter->index + 1);
pWriter->index = pHdr->index; pWriter->index = pHdr->index;
vInfo("vgId:%d, vnode snapshot write data, index:%" PRId64 " type:%d nData:%d", TD_VID(pVnode), pHdr->index, vInfo("vgId:%d, vnode snapshot write data, index:%" PRId64 " type:%d blockLen:%d", TD_VID(pVnode), pHdr->index,
pHdr->type, nData); pHdr->type, nData);
switch (pHdr->type) { switch (pHdr->type) {
......
...@@ -465,9 +465,9 @@ static int32_t vnodeSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool ...@@ -465,9 +465,9 @@ static int32_t vnodeSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool
static int32_t vnodeSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) { static int32_t vnodeSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
vDebug("vgId:%d, continue write vnode snapshot, len:%d", pVnode->config.vgId, len); vDebug("vgId:%d, continue write vnode snapshot, blockLen:%d", pVnode->config.vgId, len);
int32_t code = vnodeSnapWrite(pWriter, pBuf, len); int32_t code = vnodeSnapWrite(pWriter, pBuf, len);
vDebug("vgId:%d, continue write vnode snapshot finished, len:%d", pVnode->config.vgId, len); vDebug("vgId:%d, continue write vnode snapshot finished, blockLen:%d", pVnode->config.vgId, len);
return code; return code;
} }
......
...@@ -294,7 +294,7 @@ int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) { ...@@ -294,7 +294,7 @@ int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
} }
if (snapshotSenderIsStart(pSender)) { if (snapshotSenderIsStart(pSender)) {
sSError(pSender, "snapshot sender already start, ignore"); sSInfo(pSender, "snapshot sender already start, ignore");
return 0; return 0;
} }
...@@ -523,7 +523,7 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap ...@@ -523,7 +523,7 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap
static int32_t snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) { static int32_t snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
if (pMsg->seq != pReceiver->ack + 1) { if (pMsg->seq != pReceiver->ack + 1) {
sRError(pReceiver, "snapshot receiver invalid seq, ack:%d seq:%d", pReceiver->ack, pMsg->seq); sRError(pReceiver, "snapshot receiver invalid seq, ack:%d seq:%d", pReceiver->ack, pMsg->seq);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; terrno = TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG;
return -1; return -1;
} }
...@@ -721,8 +721,12 @@ static int32_t syncNodeOnSnapshotTransfering(SSyncNode *pSyncNode, SyncSnapshotS ...@@ -721,8 +721,12 @@ static int32_t syncNodeOnSnapshotTransfering(SSyncNode *pSyncNode, SyncSnapshotS
timeNow = taosGetTimestampMs(); timeNow = taosGetTimestampMs();
} }
int32_t code = 0;
if (snapshotReceiverGotData(pReceiver, pMsg) != 0) { if (snapshotReceiverGotData(pReceiver, pMsg) != 0) {
return -1; code = terrno;
if (code >= SYNC_SNAPSHOT_SEQ_INVALID) {
code = TSDB_CODE_SYN_INTERNAL_ERROR;
}
} }
// build msg // build msg
...@@ -740,7 +744,7 @@ static int32_t syncNodeOnSnapshotTransfering(SSyncNode *pSyncNode, SyncSnapshotS ...@@ -740,7 +744,7 @@ static int32_t syncNodeOnSnapshotTransfering(SSyncNode *pSyncNode, SyncSnapshotS
pRspMsg->lastTerm = pMsg->lastTerm; pRspMsg->lastTerm = pMsg->lastTerm;
pRspMsg->startTime = pReceiver->startTime; pRspMsg->startTime = pReceiver->startTime;
pRspMsg->ack = pReceiver->ack; // receiver maybe already closed pRspMsg->ack = pReceiver->ack; // receiver maybe already closed
pRspMsg->code = 0; pRspMsg->code = code;
pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start; pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
// send msg // send msg
...@@ -861,7 +865,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { ...@@ -861,7 +865,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process force stop"); syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process force stop");
snapshotReceiverForceStop(pReceiver); snapshotReceiverForceStop(pReceiver);
} else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) { } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq"); syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq data");
syncNodeOnSnapshotTransfering(pSyncNode, pMsg); syncNodeOnSnapshotTransfering(pSyncNode, pMsg);
} else { } else {
// error log // error log
...@@ -982,68 +986,85 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { ...@@ -982,68 +986,85 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
} }
// state, term, seq/ack // state, term, seq/ack
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
if (pMsg->term == pSyncNode->pRaftStore->currentTerm) { sSError(pSender, "snapshot sender not leader");
// prepare <begin, end>, send begin msg return -1;
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT) { }
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq pre-snapshot");
syncNodeOnSnapshotReplyPre(pSyncNode, pMsg);
return 0;
}
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_BEGIN) { if (pMsg->term != pSyncNode->pRaftStore->currentTerm) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq begin"); sSError(pSender, "snapshot sender term not equal");
if (snapshotSenderUpdateProgress(pSender, pMsg) != 0) { return -1;
return -1; }
}
if (snapshotSend(pSender) != 0) { if (pMsg->code != 0) {
return -1; syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "receive error code");
} sSError(pSender, "snapshot sender receive error code:0x%x and stop sender", pMsg->code);
return 0; snapshotSenderStop(pSender, true);
} SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId);
if (pMgr) {
syncLogReplMgrReset(pMgr);
}
// receive ack is finish, close sender return -1;
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) { }
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq end");
snapshotSenderStop(pSender, true);
SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId);
if (pMgr) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "reset repl mgr");
syncLogReplMgrReset(pMgr);
}
return 0;
}
// send next msg // prepare <begin, end>, send begin msg
if (pMsg->ack == pSender->seq) { if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq"); syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq pre-snapshot");
// update sender ack syncNodeOnSnapshotReplyPre(pSyncNode, pMsg);
if (snapshotSenderUpdateProgress(pSender, pMsg) != 0) { return 0;
return -1; }
}
if (snapshotSend(pSender) != 0) {
return -1;
}
} else if (pMsg->ack == pSender->seq - 1) { if (pMsg->ack == SYNC_SNAPSHOT_SEQ_BEGIN) {
// maybe resend syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq begin");
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq and resend"); if (snapshotSenderUpdateProgress(pSender, pMsg) != 0) {
snapshotReSend(pSender); return -1;
}
} else { if (snapshotSend(pSender) != 0) {
// error log return -1;
sSError(pSender, "snapshot sender recv error ack:%d, my seq:%d", pMsg->ack, pSender->seq); }
return -1; return 0;
} }
} else {
// error log // receive ack is finish, close sender
sSError(pSender, "snapshot sender term not equal"); if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq end");
snapshotSenderStop(pSender, true);
SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId);
if (pMgr) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "reset repl mgr");
syncLogReplMgrReset(pMgr);
}
return 0;
}
// send next msg
if (pMsg->ack == pSender->seq) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq data");
// update sender ack
if (snapshotSenderUpdateProgress(pSender, pMsg) != 0) {
return -1; return -1;
} }
if (snapshotSend(pSender) != 0) {
return -1;
}
} else if (pMsg->ack == pSender->seq - 1) {
// maybe resend
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq and resend");
snapshotReSend(pSender);
} else { } else {
// error log // error log
sSError(pSender, "snapshot sender not leader"); syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "receive error ack");
sSError(pSender, "snapshot sender receive error ack:%d, my seq:%d", pMsg->ack, pSender->seq);
snapshotSenderStop(pSender, true);
SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId);
if (pMgr) {
syncLogReplMgrReset(pMgr);
}
return -1; return -1;
} }
......
...@@ -47,7 +47,7 @@ void init() { ...@@ -47,7 +47,7 @@ void init() {
pSyncNode->pWal = pWal; pSyncNode->pWal = pWal;
pSyncNode->pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM)); pSyncNode->pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM));
pSyncNode->pFsm->FpGetSnapshotInfo = GetSnapshotCb; // pSyncNode->pFsm->FpGetSnapshotInfo = GetSnapshotCb;
} }
void cleanup() { void cleanup() {
......
...@@ -47,7 +47,7 @@ void init() { ...@@ -47,7 +47,7 @@ void init() {
pSyncNode->pWal = pWal; pSyncNode->pWal = pWal;
pSyncNode->pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM)); pSyncNode->pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM));
pSyncNode->pFsm->FpGetSnapshotInfo = GetSnapshotCb; // pSyncNode->pFsm->FpGetSnapshotInfo = GetSnapshotCb;
} }
void cleanup() { void cleanup() {
......
...@@ -405,6 +405,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_PROPOSE_NOT_READY, "Sync not ready for pr ...@@ -405,6 +405,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_PROPOSE_NOT_READY, "Sync not ready for pr
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_STANDBY_NOT_READY, "Sync not ready for standby") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_STANDBY_NOT_READY, "Sync not ready for standby")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_BATCH_ERROR, "Sync batch error") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_BATCH_ERROR, "Sync batch error")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_RESTORING, "Sync is restoring") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_RESTORING, "Sync is restoring")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG, "Sync invalid snapshot msg")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error")
//tq //tq
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册