未验证 提交 83112644 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #19149 from taosdata/enh/TD-21492

enh: handle error while transfer snapshot
......@@ -113,8 +113,8 @@ int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) {
pHdr->size = nData;
memcpy(pHdr->data, pData, nData);
metaInfo("vgId:%d, vnode snapshot meta read data, version:%" PRId64 " uid:%" PRId64 " nData:%d",
TD_VID(pReader->pMeta->pVnode), key.version, key.uid, nData);
metaDebug("vgId:%d, vnode snapshot meta read data, version:%" PRId64 " uid:%" PRId64 " blockLen:%d",
TD_VID(pReader->pMeta->pVnode), key.version, key.uid, nData);
_exit:
return code;
......
......@@ -257,8 +257,8 @@ _exit:
pReader->index++;
*nData = sizeof(SSnapDataHdr) + pHdr->size;
pHdr->index = pReader->index;
vInfo("vgId:%d, vnode snapshot read data,index:%" PRId64 " type:%d nData:%d ", TD_VID(pReader->pVnode),
pReader->index, pHdr->type, *nData);
vDebug("vgId:%d, vnode snapshot read data, index:%" PRId64 " type:%d blockLen:%d ", TD_VID(pReader->pVnode),
pReader->index, pHdr->type, *nData);
} else {
vInfo("vgId:%d, vnode snapshot read data end, index:%" PRId64, TD_VID(pReader->pVnode), pReader->index);
}
......
......@@ -238,7 +238,7 @@ int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode);
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms);
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode);
int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms);
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode);
void syncNodeResetElectTimer(SSyncNode* pSyncNode);
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode);
......
......@@ -61,7 +61,7 @@ typedef struct SSyncLogBuffer {
// SSyncLogRepMgr
SSyncLogReplMgr* syncLogReplMgrCreate();
void syncLogReplMgrDestroy(SSyncLogReplMgr* pMgr);
int32_t syncLogReplMgrReset(SSyncLogReplMgr* pMgr);
void syncLogReplMgrReset(SSyncLogReplMgr* pMgr);
int32_t syncNodeLogReplMgrInit(SSyncNode* pNode);
void syncNodeLogReplMgrDestroy(SSyncNode* pNode);
......
......@@ -56,7 +56,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
void snapshotSenderDestroy(SSyncSnapshotSender *pSender);
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender);
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender);
int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish);
void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish);
int32_t snapshotSend(SSyncSnapshotSender *pSender);
int32_t snapshotReSend(SSyncSnapshotSender *pSender);
......@@ -79,8 +79,8 @@ typedef struct SSyncSnapshotReceiver {
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId);
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver);
int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg);
int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver);
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg);
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver);
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver);
void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver);
......
......@@ -200,12 +200,15 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
code = syncNodeOnLocalCmd(pSyncNode, pMsg);
break;
default:
sError("vgId:%d, failed to process msg:%p since invalid type:%s", pSyncNode->vgId, pMsg,
TMSG_INFO(pMsg->msgType));
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
code = -1;
}
syncNodeRelease(pSyncNode);
if (code != 0) {
sDebug("vgId:%d, failed to process sync msg:%p type:%s since 0x%x", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
terrno);
}
return code;
}
......@@ -228,8 +231,7 @@ int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
syncNodeRelease(pNode);
if (ret == 1) {
sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle,
rpcMsg.info.ahandle);
sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle, rpcMsg.info.ahandle);
rpcSendResponse(&rpcMsg);
return 0;
} else {
......@@ -1084,13 +1086,17 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
// snapshot senders
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, i);
// ASSERT(pSender != NULL);
(pSyncNode->senders)[i] = pSender;
sSDebug(pSender, "snapshot sender create new while open, data:%p", pSender);
if (pSender == NULL) return NULL;
pSyncNode->senders[i] = pSender;
sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
}
// snapshot receivers
pSyncNode->pNewNodeReceiver = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID);
if (pSyncNode->pNewNodeReceiver == NULL) return NULL;
sRDebug(pSyncNode->pNewNodeReceiver, "snapshot receiver create while open sync node, data:%p",
pSyncNode->pNewNodeReceiver);
// is config changing
pSyncNode->changing = false;
......@@ -1131,10 +1137,8 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
pSyncNode->hbrSlowNum = 0;
pSyncNode->tmrRoutineNum = 0;
sNInfo(pSyncNode, "sync open, node:%p", pSyncNode);
sTrace("vgId:%d, tsElectInterval:%d, tsHeartbeatInterval:%d, tsHeartbeatTimeout:%d", pSyncNode->vgId, tsElectInterval,
tsHeartbeatInterval, tsHeartbeatTimeout);
sNInfo(pSyncNode, "sync open, node:%p electInterval:%d heartbeatInterval:%d heartbeatTimeout:%d", pSyncNode,
tsElectInterval, tsHeartbeatInterval, tsHeartbeatTimeout);
return pSyncNode;
_error:
......@@ -1251,6 +1255,8 @@ void syncNodePreClose(SSyncNode* pSyncNode) {
snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver);
}
sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId,
pSyncNode->pNewNodeReceiver);
snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
pSyncNode->pNewNodeReceiver = NULL;
}
......@@ -1295,15 +1301,15 @@ void syncNodeClose(SSyncNode* pSyncNode) {
syncNodeStopHeartbeatTimer(pSyncNode);
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
if ((pSyncNode->senders)[i] != NULL) {
sSTrace((pSyncNode->senders)[i], "snapshot sender destroy while close, data:%p", (pSyncNode->senders)[i]);
if (pSyncNode->senders[i] != NULL) {
sDebug("vgId:%d, snapshot sender destroy while close, data:%p", pSyncNode->vgId, pSyncNode->senders[i]);
if (snapshotSenderIsStart((pSyncNode->senders)[i])) {
snapshotSenderStop((pSyncNode->senders)[i], false);
if (snapshotSenderIsStart(pSyncNode->senders[i])) {
snapshotSenderStop(pSyncNode->senders[i], false);
}
snapshotSenderDestroy((pSyncNode->senders)[i]);
(pSyncNode->senders)[i] = NULL;
snapshotSenderDestroy(pSyncNode->senders[i]);
pSyncNode->senders[i] = NULL;
}
}
......@@ -1312,6 +1318,7 @@ void syncNodeClose(SSyncNode* pSyncNode) {
snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver);
}
sDebug("vgId:%d, snapshot receiver destroy while close, data:%p", pSyncNode->vgId, pSyncNode->pNewNodeReceiver);
snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
pSyncNode->pNewNodeReceiver = NULL;
}
......@@ -1382,8 +1389,7 @@ int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
return ret;
}
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) {
int32_t ret = 0;
void syncNodeResetElectTimer(SSyncNode* pSyncNode) {
int32_t electMS;
if (pSyncNode->pRaftCfg->isStandBy) {
......@@ -1391,11 +1397,11 @@ int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) {
} else {
electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
}
ret = syncNodeRestartElectTimer(pSyncNode, electMS);
(void)syncNodeRestartElectTimer(pSyncNode, electMS);
sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine,
electMS);
return ret;
}
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
......@@ -1455,23 +1461,20 @@ int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
return 0;
}
// utils --------------
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
SEpSet epSet;
syncUtilRaftId2EpSet(destRaftId, &epSet);
if (pSyncNode->syncSendMSg != NULL) {
// htonl
syncUtilMsgHtoN(pMsg->pCont);
pMsg->info.noResp = 1;
pSyncNode->syncSendMSg(&epSet, pMsg);
return pSyncNode->syncSendMSg(&epSet, pMsg);
} else {
sError("vgId:%d, sync send msg by id error, fp-send-msg is null", pSyncNode->vgId);
rpcFreeCont(pMsg->pCont);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
return 0;
}
int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
......@@ -1586,7 +1589,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA];
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
oldSenders[i] = (pSyncNode->senders)[i];
oldSenders[i] = pSyncNode->senders[i];
sSTrace(oldSenders[i], "snapshot sender save old");
}
......@@ -1625,7 +1628,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
// clear new
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
(pSyncNode->senders)[i] = NULL;
pSyncNode->senders[i] = NULL;
}
// reset new
......@@ -1640,16 +1643,16 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
sNTrace(pSyncNode, "snapshot sender reset for: %" PRId64 ", newIndex:%d, %s:%d, %p",
(pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j]);
(pSyncNode->senders)[i] = oldSenders[j];
pSyncNode->senders[i] = oldSenders[j];
oldSenders[j] = NULL;
reset = true;
// reset replicaIndex
int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex;
(pSyncNode->senders)[i]->replicaIndex = i;
int32_t oldreplicaIndex = pSyncNode->senders[i]->replicaIndex;
pSyncNode->senders[i]->replicaIndex = i;
sNTrace(pSyncNode, "snapshot sender udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d", oldreplicaIndex,
i, host, port, (pSyncNode->senders)[i], reset);
i, host, port, pSyncNode->senders[i], reset);
break;
}
......@@ -1658,18 +1661,23 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
// create new
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
if ((pSyncNode->senders)[i] == NULL) {
(pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i);
sSTrace((pSyncNode->senders)[i], "snapshot sender create new while reconfig, data:%p", (pSyncNode->senders)[i]);
if (pSyncNode->senders[i] == NULL) {
pSyncNode->senders[i] = snapshotSenderCreate(pSyncNode, i);
if (pSyncNode->senders[i] == NULL) {
// will be created later while send snapshot
sSError(pSyncNode->senders[i], "snapshot sender create failed while reconfig");
} else {
sSDebug(pSyncNode->senders[i], "snapshot sender create while reconfig, data:%p", pSyncNode->senders[i]);
}
} else {
sSTrace((pSyncNode->senders)[i], "snapshot sender already exist, data:%p", (pSyncNode->senders)[i]);
sSDebug(pSyncNode->senders[i], "snapshot sender already exist, data:%p", pSyncNode->senders[i]);
}
}
// free old
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
if (oldSenders[i] != NULL) {
sNTrace(pSyncNode, "snapshot sender destroy old, data:%p replica-index:%d", oldSenders[i], i);
sSDebug(oldSenders[i], "snapshot sender destroy old, data:%p replica-index:%d", oldSenders[i], i);
snapshotSenderDestroy(oldSenders[i]);
oldSenders[i] = NULL;
}
......@@ -1844,8 +1852,8 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
if (pMySender != NULL) {
for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
if ((pSyncNode->senders)[i]->privateTerm > pMySender->privateTerm) {
pMySender->privateTerm = (pSyncNode->senders)[i]->privateTerm;
if (pSyncNode->senders[i]->privateTerm > pMySender->privateTerm) {
pMySender->privateTerm = pSyncNode->senders[i]->privateTerm;
}
}
(pMySender->privateTerm) += 100;
......
......@@ -566,7 +566,9 @@ _out:
return ret;
}
int32_t syncLogReplMgrReset(SSyncLogReplMgr* pMgr) {
void syncLogReplMgrReset(SSyncLogReplMgr* pMgr) {
if (pMgr == NULL) return;
ASSERT(pMgr->startIndex >= 0);
for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) {
memset(&pMgr->states[index % pMgr->size], 0, sizeof(pMgr->states[0]));
......@@ -576,7 +578,6 @@ int32_t syncLogReplMgrReset(SSyncLogReplMgr* pMgr) {
pMgr->endIndex = 0;
pMgr->restored = false;
pMgr->retryBackoff = 0;
return 0;
}
int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册