提交 668a26c1 编写于 作者: S Shengliang Guan

enh: handle error while transfer snapshot

上级 1aad2670
......@@ -238,7 +238,7 @@ int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode);
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms);
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode);
int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms);
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode);
void syncNodeResetElectTimer(SSyncNode* pSyncNode);
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode);
......
......@@ -61,7 +61,7 @@ typedef struct SSyncLogBuffer {
// SSyncLogRepMgr
SSyncLogReplMgr* syncLogReplMgrCreate();
void syncLogReplMgrDestroy(SSyncLogReplMgr* pMgr);
int32_t syncLogReplMgrReset(SSyncLogReplMgr* pMgr);
void syncLogReplMgrReset(SSyncLogReplMgr* pMgr);
int32_t syncNodeLogReplMgrInit(SSyncNode* pNode);
void syncNodeLogReplMgrDestroy(SSyncNode* pNode);
......
......@@ -56,7 +56,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
void snapshotSenderDestroy(SSyncSnapshotSender *pSender);
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender);
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender);
int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish);
void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish);
int32_t snapshotSend(SSyncSnapshotSender *pSender);
int32_t snapshotReSend(SSyncSnapshotSender *pSender);
......@@ -79,8 +79,8 @@ typedef struct SSyncSnapshotReceiver {
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId);
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver);
int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg);
int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver);
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg);
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver);
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver);
void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver);
......
......@@ -200,12 +200,15 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
code = syncNodeOnLocalCmd(pSyncNode, pMsg);
break;
default:
sError("vgId:%d, failed to process msg:%p since invalid type:%s", pSyncNode->vgId, pMsg,
TMSG_INFO(pMsg->msgType));
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
code = -1;
}
syncNodeRelease(pSyncNode);
if (code != 0) {
sDebug("vgId:%d, failed to process sync msg:%p type:%s since 0x%x", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
terrno);
}
return code;
}
......@@ -228,8 +231,7 @@ int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
syncNodeRelease(pNode);
if (ret == 1) {
sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle,
rpcMsg.info.ahandle);
sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle, rpcMsg.info.ahandle);
rpcSendResponse(&rpcMsg);
return 0;
} else {
......@@ -1084,13 +1086,17 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
// snapshot senders
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, i);
// ASSERT(pSender != NULL);
(pSyncNode->senders)[i] = pSender;
sSDebug(pSender, "snapshot sender create new while open, data:%p", pSender);
if (pSender == NULL) return NULL;
pSyncNode->senders[i] = pSender;
sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
}
// snapshot receivers
pSyncNode->pNewNodeReceiver = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID);
if (pSyncNode->pNewNodeReceiver == NULL) return NULL;
sRDebug(pSyncNode->pNewNodeReceiver, "snapshot receiver create while open sync node, data:%p",
pSyncNode->pNewNodeReceiver);
// is config changing
pSyncNode->changing = false;
......@@ -1131,10 +1137,8 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
pSyncNode->hbrSlowNum = 0;
pSyncNode->tmrRoutineNum = 0;
sNInfo(pSyncNode, "sync open, node:%p", pSyncNode);
sTrace("vgId:%d, tsElectInterval:%d, tsHeartbeatInterval:%d, tsHeartbeatTimeout:%d", pSyncNode->vgId, tsElectInterval,
tsHeartbeatInterval, tsHeartbeatTimeout);
sNInfo(pSyncNode, "sync open, node:%p electInterval:%d heartbeatInterval:%d heartbeatTimeout:%d", pSyncNode,
tsElectInterval, tsHeartbeatInterval, tsHeartbeatTimeout);
return pSyncNode;
_error:
......@@ -1251,6 +1255,8 @@ void syncNodePreClose(SSyncNode* pSyncNode) {
snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver);
}
sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId,
pSyncNode->pNewNodeReceiver);
snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
pSyncNode->pNewNodeReceiver = NULL;
}
......@@ -1295,15 +1301,15 @@ void syncNodeClose(SSyncNode* pSyncNode) {
syncNodeStopHeartbeatTimer(pSyncNode);
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
if ((pSyncNode->senders)[i] != NULL) {
sSTrace((pSyncNode->senders)[i], "snapshot sender destroy while close, data:%p", (pSyncNode->senders)[i]);
if (pSyncNode->senders[i] != NULL) {
sDebug("vgId:%d, snapshot sender destroy while close, data:%p", pSyncNode->vgId, pSyncNode->senders[i]);
if (snapshotSenderIsStart((pSyncNode->senders)[i])) {
snapshotSenderStop((pSyncNode->senders)[i], false);
if (snapshotSenderIsStart(pSyncNode->senders[i])) {
snapshotSenderStop(pSyncNode->senders[i], false);
}
snapshotSenderDestroy((pSyncNode->senders)[i]);
(pSyncNode->senders)[i] = NULL;
snapshotSenderDestroy(pSyncNode->senders[i]);
pSyncNode->senders[i] = NULL;
}
}
......@@ -1312,6 +1318,7 @@ void syncNodeClose(SSyncNode* pSyncNode) {
snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver);
}
sDebug("vgId:%d, snapshot receiver destroy while close, data:%p", pSyncNode->vgId, pSyncNode->pNewNodeReceiver);
snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
pSyncNode->pNewNodeReceiver = NULL;
}
......@@ -1382,8 +1389,7 @@ int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
return ret;
}
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) {
int32_t ret = 0;
void syncNodeResetElectTimer(SSyncNode* pSyncNode) {
int32_t electMS;
if (pSyncNode->pRaftCfg->isStandBy) {
......@@ -1391,11 +1397,11 @@ int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) {
} else {
electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
}
ret = syncNodeRestartElectTimer(pSyncNode, electMS);
(void)syncNodeRestartElectTimer(pSyncNode, electMS);
sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine,
electMS);
return ret;
}
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
......@@ -1455,23 +1461,20 @@ int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
return 0;
}
// utils --------------
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
SEpSet epSet;
syncUtilRaftId2EpSet(destRaftId, &epSet);
if (pSyncNode->syncSendMSg != NULL) {
// htonl
syncUtilMsgHtoN(pMsg->pCont);
pMsg->info.noResp = 1;
pSyncNode->syncSendMSg(&epSet, pMsg);
return pSyncNode->syncSendMSg(&epSet, pMsg);
} else {
sError("vgId:%d, sync send msg by id error, fp-send-msg is null", pSyncNode->vgId);
rpcFreeCont(pMsg->pCont);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
return 0;
}
int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
......@@ -1586,7 +1589,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA];
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
oldSenders[i] = (pSyncNode->senders)[i];
oldSenders[i] = pSyncNode->senders[i];
sSTrace(oldSenders[i], "snapshot sender save old");
}
......@@ -1625,7 +1628,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
// clear new
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
(pSyncNode->senders)[i] = NULL;
pSyncNode->senders[i] = NULL;
}
// reset new
......@@ -1640,16 +1643,16 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
sNTrace(pSyncNode, "snapshot sender reset for: %" PRId64 ", newIndex:%d, %s:%d, %p",
(pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j]);
(pSyncNode->senders)[i] = oldSenders[j];
pSyncNode->senders[i] = oldSenders[j];
oldSenders[j] = NULL;
reset = true;
// reset replicaIndex
int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex;
(pSyncNode->senders)[i]->replicaIndex = i;
int32_t oldreplicaIndex = pSyncNode->senders[i]->replicaIndex;
pSyncNode->senders[i]->replicaIndex = i;
sNTrace(pSyncNode, "snapshot sender udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d", oldreplicaIndex,
i, host, port, (pSyncNode->senders)[i], reset);
i, host, port, pSyncNode->senders[i], reset);
break;
}
......@@ -1658,18 +1661,23 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
// create new
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
if ((pSyncNode->senders)[i] == NULL) {
(pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i);
sSTrace((pSyncNode->senders)[i], "snapshot sender create new while reconfig, data:%p", (pSyncNode->senders)[i]);
if (pSyncNode->senders[i] == NULL) {
pSyncNode->senders[i] = snapshotSenderCreate(pSyncNode, i);
if (pSyncNode->senders[i] == NULL) {
// will be created later while send snapshot
sSError(pSyncNode->senders[i], "snapshot sender create failed while reconfig");
} else {
sSDebug(pSyncNode->senders[i], "snapshot sender create while reconfig, data:%p", pSyncNode->senders[i]);
}
} else {
sSTrace((pSyncNode->senders)[i], "snapshot sender already exist, data:%p", (pSyncNode->senders)[i]);
sSDebug(pSyncNode->senders[i], "snapshot sender already exist, data:%p", pSyncNode->senders[i]);
}
}
// free old
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
if (oldSenders[i] != NULL) {
sNTrace(pSyncNode, "snapshot sender destroy old, data:%p replica-index:%d", oldSenders[i], i);
sSDebug(oldSenders[i], "snapshot sender destroy old, data:%p replica-index:%d", oldSenders[i], i);
snapshotSenderDestroy(oldSenders[i]);
oldSenders[i] = NULL;
}
......@@ -1844,8 +1852,8 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
if (pMySender != NULL) {
for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
if ((pSyncNode->senders)[i]->privateTerm > pMySender->privateTerm) {
pMySender->privateTerm = (pSyncNode->senders)[i]->privateTerm;
if (pSyncNode->senders[i]->privateTerm > pMySender->privateTerm) {
pMySender->privateTerm = pSyncNode->senders[i]->privateTerm;
}
}
(pMySender->privateTerm) += 100;
......
......@@ -566,7 +566,9 @@ _out:
return ret;
}
int32_t syncLogReplMgrReset(SSyncLogReplMgr* pMgr) {
void syncLogReplMgrReset(SSyncLogReplMgr* pMgr) {
if (pMgr == NULL) return;
ASSERT(pMgr->startIndex >= 0);
for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) {
memset(&pMgr->states[index % pMgr->size], 0, sizeof(pMgr->states[0]));
......@@ -576,7 +578,6 @@ int32_t syncLogReplMgrReset(SSyncLogReplMgr* pMgr) {
pMgr->endIndex = 0;
pMgr->restored = false;
pMgr->retryBackoff = 0;
return 0;
}
int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
......
......@@ -54,7 +54,6 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
if (pSender == NULL) return;
sDebug("vgId:%d, snapshot sender destroy", pSender->pSyncNode->vgId);
// free current block
if (pSender->pCurrentBlock != NULL) {
......@@ -75,12 +74,6 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; }
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
if (snapshotSenderIsStart(pSender)) {
sSError(pSender, "vgId:%d, snapshot sender is already start");
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
pSender->start = true;
pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
......@@ -95,7 +88,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
pSender->snapshot.lastApplyTerm = SYNC_TERM_INVALID;
pSender->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
memset(&(pSender->lastConfig), 0, sizeof(pSender->lastConfig));
memset(&pSender->lastConfig, 0, sizeof(pSender->lastConfig));
pSender->sendingMS = 0;
pSender->term = pSender->pSyncNode->pRaftStore->currentTerm;
pSender->startTime = taosGetTimestampMs();
......@@ -111,7 +104,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
SyncSnapshotSend *pMsg = rpcMsg.pCont;
pMsg->srcId = pSender->pSyncNode->myRaftId;
pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
pMsg->beginIndex = pSender->snapshotParam.start;
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
......@@ -122,7 +115,6 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
pMsg->seq = SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT;
// event log
sSDebug(pSender, "snapshot sender start");
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender start");
// send msg
......@@ -134,7 +126,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
return 0;
}
int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
sSDebug(pSender, "snapshot sender stop, finish:%d reader:%p", finish, pSender->pReader);
// update flag
......@@ -154,8 +146,6 @@ int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
pSender->pCurrentBlock = NULL;
pSender->blockLen = 0;
}
return 0;
}
// when sender receive ack, call this function to send msg from seq
......@@ -177,8 +167,8 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
}
if (pSender->blockLen > 0) {
sSDebug(pSender, "snapshot sender continue to read, blockLen:%d seq:%d", pSender->blockLen, pSender->seq);
// has read data
sSDebug(pSender, "snapshot sender continue to read, blockLen:%d seq:%d", pSender->blockLen, pSender->seq);
} else {
// read finish, update seq to end
pSender->seq = SYNC_SNAPSHOT_SEQ_END;
......@@ -194,7 +184,7 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
SyncSnapshotSend *pMsg = rpcMsg.pCont;
pMsg->srcId = pSender->pSyncNode->myRaftId;
pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
pMsg->beginIndex = pSender->snapshotParam.start;
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
......@@ -202,7 +192,6 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
pMsg->lastConfig = pSender->lastConfig;
pMsg->seq = pSender->seq;
// pMsg->privateTerm = pSender->privateTerm;
if (pSender->pCurrentBlock != NULL) {
memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);
......@@ -210,10 +199,8 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
// event log
if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) {
sSDebug(pSender, "snapshot sender finish, seq:%d", pSender->seq);
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender finish");
} else {
sSDebug(pSender, "snapshot sender sending, seq:%d", pSender->seq);
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender sending");
}
......@@ -238,7 +225,7 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
SyncSnapshotSend *pMsg = rpcMsg.pCont;
pMsg->srcId = pSender->pSyncNode->myRaftId;
pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
pMsg->beginIndex = pSender->snapshotParam.start;
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
......@@ -248,12 +235,10 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
pMsg->seq = pSender->seq;
if (pSender->pCurrentBlock != NULL && pSender->blockLen > 0) {
// pMsg->privateTerm = pSender->privateTerm;
memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);
}
// event log
sSDebug(pSender, "snapshot sender resend, seq:%d", pSender->seq);
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender resend");
// send msg
......@@ -299,13 +284,10 @@ int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
if (pSender->finish && taosGetTimestampMs() - pSender->endTime < SNAPSHOT_WAIT_MS) {
sSInfo(pSender, "snapshot sender start too frequently, ignore");
return 1;
return 0;
}
char host[64];
uint16_t port;
syncUtilU642Addr(pDestId->addr, host, sizeof(host), &port);
sSInfo(pSender, "snapshot sender start for peer:%s:%u", host, port);
sSInfo(pSender, "snapshot sender start");
int32_t code = snapshotSenderStart(pSender);
if (code != 0) {
......@@ -338,13 +320,11 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from
pReceiver->snapshot.lastApplyTerm = 0;
pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
sDebug("vgId:%d, snapshot receiver create", pSyncNode->vgId);
return pReceiver;
}
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
if (pReceiver == NULL) return;
sDebug("vgId:%d, snapshot receiver destroy", pReceiver->pSyncNode->vgId);
// close writer
if (pReceiver->pWriter != NULL) {
......@@ -368,7 +348,6 @@ void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) {
// force close, abandon incomplete data
if (pReceiver->pWriter != NULL) {
// event log
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
&pReceiver->snapshot);
if (ret != 0) {
......@@ -380,13 +359,7 @@ void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) {
pReceiver->start = false;
}
int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
if (!snapshotReceiverIsStart(pReceiver)) {
sRError(pReceiver, "snapshot receiver is not start");
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
static int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
if (pReceiver->pWriter != NULL) {
sRError(pReceiver, "vgId:%d, snapshot receiver writer is not null");
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
......@@ -416,10 +389,10 @@ int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapsh
return 0;
}
int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pPreMsg) {
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pPreMsg) {
if (snapshotReceiverIsStart(pReceiver)) {
sRInfo(pReceiver, "snapshot receiver has started");
return 0;
return;
}
pReceiver->start = true;
......@@ -430,12 +403,11 @@ int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend
// event log
sRInfo(pReceiver, "snapshot receiver is start");
return 0;
}
// just set start = false
// FpSnapshotStopWrite should not be called, assert writer == NULL
int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
sRInfo(pReceiver, "snapshot receiver stop, not apply, writer:%p", pReceiver->pWriter);
if (pReceiver->pWriter != NULL) {
......@@ -450,17 +422,10 @@ int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
}
pReceiver->start = false;
return 0;
}
// when recv last snapshot block, apply data into snapshot
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
if (pMsg->seq != SYNC_SNAPSHOT_SEQ_END) {
sRError(pReceiver, "snapshot receiver seq:%d is invalid", pMsg->seq);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
int32_t code = 0;
if (pReceiver->pWriter != NULL) {
// write data
......@@ -582,6 +547,7 @@ SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) {
static int32_t syncNodeOnSnapshotPre(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
int64_t timeNow = taosGetTimestampMs();
int32_t code = 0;
if (snapshotReceiverIsStart(pReceiver)) {
// already start
......@@ -593,14 +559,14 @@ static int32_t syncNodeOnSnapshotPre(SSyncNode *pSyncNode, SyncSnapshotSend *pMs
sRInfo(pReceiver, "snapshot receiver startTime:%" PRId64 " == msg startTime:%" PRId64 " send reply",
pReceiver->startTime, pMsg->startTime);
goto _SEND_REPLY;
} else {
// ignore
sRInfo(pReceiver, "snapshot receiver startTime:%" PRId64 " < msg startTime:%" PRId64 " ignore",
pReceiver->startTime, pMsg->startTime);
return 0;
sRError(pReceiver, "snapshot receiver startTime:%" PRId64 " < msg startTime:%" PRId64 " ignore",
pReceiver->startTime, pMsg->startTime);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
code = terrno;
goto _SEND_REPLY;
}
} else {
// start new
sRInfo(pReceiver, "snapshot receiver not start yet so start new one");
......@@ -611,7 +577,8 @@ _START_RECEIVER:
if (timeNow - pMsg->startTime > SNAPSHOT_MAX_CLOCK_SKEW_MS) {
sRError(pReceiver, "snapshot receiver time skew too much, now:%" PRId64 " msg startTime:%" PRId64, timeNow,
pMsg->startTime);
return -1;
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
code = terrno;
} else {
// waiting for clock match
while (timeNow < pMsg->startTime) {
......@@ -647,7 +614,7 @@ _SEND_REPLY:
pRspMsg->lastTerm = pMsg->lastTerm;
pRspMsg->startTime = pReceiver->startTime;
pRspMsg->ack = pMsg->seq; // receiver maybe already closed
pRspMsg->code = 0;
pRspMsg->code = code;
pRspMsg->snapBeginIndex = syncNodeGetSnapBeginIndex(pSyncNode);
// send msg
......@@ -657,26 +624,36 @@ _SEND_REPLY:
return -1;
}
return 0;
return code;
}
static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
// condition 1
SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
int32_t code = TSDB_CODE_SYN_INTERNAL_ERROR;
if (!snapshotReceiverIsStart(pReceiver)) {
sRError(pReceiver, "snapshot receiver not start");
return -1;
sRError(pReceiver, "snapshot receiver begin failed since not start");
goto _SEND_REPLY;
}
if (pReceiver->startTime != pMsg->startTime) {
sRError(pReceiver, "snapshot receiver startTime:%" PRId64 " not equal to msg startTime:%" PRId64,
sRError(pReceiver, "snapshot receiver begin failed since startTime:%" PRId64 " not equal to msg startTime:%" PRId64,
pReceiver->startTime, pMsg->startTime);
return -1;
goto _SEND_REPLY;
}
// start writer
snapshotReceiverStartWriter(pReceiver, pMsg);
if (snapshotReceiverStartWriter(pReceiver, pMsg) != 0) {
sRError(pReceiver, "snapshot receiver begin failed since start writer failed");
goto _SEND_REPLY;
}
code = 0;
_SEND_REPLY:
if (code != 0 && terrno != 0) {
code = terrno;
}
// build msg
SRpcMsg rpcMsg = {0};
......@@ -693,7 +670,7 @@ static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *p
pRspMsg->lastTerm = pMsg->lastTerm;
pRspMsg->startTime = pReceiver->startTime;
pRspMsg->ack = pReceiver->ack; // receiver maybe already closed
pRspMsg->code = 0;
pRspMsg->code = code;
pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
// send msg
......@@ -703,10 +680,10 @@ static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *p
return -1;
}
return 0;
return code;
}
static int32_t syncNodeOnSnapshotTransfering(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
// condition 4
// transfering
SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
......@@ -753,7 +730,7 @@ static int32_t syncNodeOnSnapshotTransfering(SSyncNode *pSyncNode, SyncSnapshotS
return -1;
}
return 0;
return code;
}
static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
......@@ -790,7 +767,7 @@ static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMs
pRspMsg->lastTerm = pMsg->lastTerm;
pRspMsg->startTime = pReceiver->startTime;
pRspMsg->ack = pReceiver->ack; // receiver maybe already closed
pRspMsg->code = 0;
pRspMsg->code = code;
pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
// send msg
......@@ -800,7 +777,7 @@ static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMs
return -1;
}
return 0;
return code;
}
// receiver on message
......@@ -830,12 +807,14 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
// if already drop replica, do not process
if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "not in my config");
return 0;
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
if (pMsg->term < pSyncNode->pRaftStore->currentTerm) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "reject since small term");
return 0;
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
if (pMsg->term > pSyncNode->pRaftStore->currentTerm) {
......@@ -844,20 +823,21 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
syncNodeResetElectTimer(pSyncNode);
// state, term, seq/ack
int32_t code = 0;
if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq pre-snapshot");
syncNodeOnSnapshotPre(pSyncNode, pMsg);
code = syncNodeOnSnapshotPre(pSyncNode, pMsg);
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq begin");
syncNodeOnSnapshotBegin(pSyncNode, pMsg);
code = syncNodeOnSnapshotBegin(pSyncNode, pMsg);
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq end");
syncNodeOnSnapshotEnd(pSyncNode, pMsg);
code = syncNodeOnSnapshotEnd(pSyncNode, pMsg);
if (syncLogBufferReInit(pSyncNode->pLogBuf, pSyncNode) != 0) {
sRError(pReceiver, "failed to reinit log buffer since %s", terrstr());
return -1;
code = -1;
}
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
// force close, no response
......@@ -865,35 +845,27 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
snapshotReceiverForceStop(pReceiver);
} else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq data");
syncNodeOnSnapshotTransfering(pSyncNode, pMsg);
code = syncNodeOnSnapshotReceive(pSyncNode, pMsg);
} else {
// error log
sRError(pReceiver, "snapshot receiver recv error seq:%d, my ack:%d", pMsg->seq, pReceiver->ack);
return -1;
code = -1;
}
} else {
// error log
sRError(pReceiver, "snapshot receiver term not equal");
return -1;
code = -1;
}
} else {
// error log
sRError(pReceiver, "snapshot receiver not follower");
return -1;
code = -1;
}
return 0;
return code;
}
int32_t syncNodeOnSnapshotReplyPre(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
// get sender
SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &(pMsg->srcId));
if (pSender == NULL) {
sNError(pSyncNode, "prepare snapshot error since sender is null");
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
static int32_t syncNodeOnSnapshotPreRsp(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
SSnapshot snapshot = {0};
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
......@@ -915,7 +887,7 @@ int32_t syncNodeOnSnapshotReplyPre(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg)
pSender->snapshot = snapshot;
// start reader
int32_t code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &(pSender->snapshotParam), &(pSender->pReader));
int32_t code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &pSender->snapshotParam, &pSender->pReader);
if (code != 0) {
sSError(pSender, "prepare snapshot failed since %s", terrstr());
return -1;
......@@ -936,7 +908,7 @@ int32_t syncNodeOnSnapshotReplyPre(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg)
SyncSnapshotSend *pSendMsg = rpcMsg.pCont;
pSendMsg->srcId = pSender->pSyncNode->myRaftId;
pSendMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
pSendMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
pSendMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
pSendMsg->beginIndex = pSender->snapshotParam.start;
pSendMsg->lastIndex = pSender->snapshot.lastApplyIndex;
......@@ -966,8 +938,9 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
SyncSnapshotRsp *pMsg = pRpcMsg->pCont;
// if already drop replica, do not process
if (!syncNodeInRaftGroup(pSyncNode, &(pMsg->srcId))) {
if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "maybe replica already dropped");
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
......@@ -983,6 +956,7 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender not leader");
sSError(pSender, "snapshot sender not leader");
terrno = TSDB_CODE_SYN_NOT_LEADER;
goto _ERROR;
}
......@@ -990,6 +964,7 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender and receiver time not match");
sSError(pSender, "sender:%" PRId64 " receiver:%" PRId64 " time not match, code:0x%x", pMsg->startTime,
pSender->startTime, pMsg->code);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _ERROR;
}
......@@ -997,20 +972,21 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender and receiver term not match");
sSError(pSender, "snapshot sender term not equal, msg term:%" PRId64 " currentTerm:%" PRId64, pMsg->term,
pSyncNode->pRaftStore->currentTerm);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _ERROR;
}
if (pMsg->code != 0) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "receive error code");
sSError(pSender, "snapshot sender receive error code:0x%x and stop sender", pMsg->code);
terrno = pMsg->code;
goto _ERROR;
}
// prepare <begin, end>, send begin msg
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq pre-snapshot");
syncNodeOnSnapshotReplyPre(pSyncNode, pMsg);
return 0;
return syncNodeOnSnapshotPreRsp(pSyncNode, pSender, pMsg);
}
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_BEGIN) {
......@@ -1030,10 +1006,7 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq end");
snapshotSenderStop(pSender, true);
SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId);
if (pMgr) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "reset repl mgr");
syncLogReplMgrReset(pMgr);
}
syncLogReplMgrReset(pMgr);
return 0;
}
......@@ -1047,22 +1020,19 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
if (snapshotSend(pSender) != 0) {
return -1;
}
} else if (pMsg->ack == pSender->seq - 1) {
// maybe resend
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq and resend");
snapshotReSend(pSender);
if (snapshotReSend(pSender) != 0) {
return -1;
}
} else {
// error log
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "receive error ack");
sSError(pSender, "snapshot sender receive error ack:%d, my seq:%d", pMsg->ack, pSender->seq);
snapshotSenderStop(pSender, true);
SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId);
if (pMgr) {
syncLogReplMgrReset(pMgr);
}
syncLogReplMgrReset(pMgr);
return -1;
}
......@@ -1071,10 +1041,7 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
_ERROR:
snapshotSenderStop(pSender, true);
SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId);
if (pMgr) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "reset repl mgr");
syncLogReplMgrReset(pMgr);
}
syncLogReplMgrReset(pMgr);
return -1;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册