未验证 提交 b67e27bb 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #18119 from taosdata/fix/TD-20372

fix(sync): fix AddressSanitizer error: TD-20372
...@@ -37,43 +37,6 @@ ...@@ -37,43 +37,6 @@
// /\ UNCHANGED <<serverVars, candidateVars, logVars, elections>> // /\ UNCHANGED <<serverVars, candidateVars, logVars, elections>>
// //
// only start once
static void syncNodeStartSnapshotOnce(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, SyncTerm lastApplyTerm,
SyncAppendEntriesReply* pMsg) {
if (beginIndex > endIndex) {
sNError(ths, "snapshot param error, start:%" PRId64 ", end:%" PRId64, beginIndex, endIndex);
return;
}
// get sender
SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId));
ASSERT(pSender != NULL);
if (snapshotSenderIsStart(pSender)) {
sSError(pSender, "snapshot sender already start");
return;
}
SSnapshot snapshot = {
.data = NULL, .lastApplyIndex = endIndex, .lastApplyTerm = lastApplyTerm, .lastConfigIndex = SYNC_INDEX_INVALID};
void* pReader = NULL;
SSnapshotParam readerParam = {.start = beginIndex, .end = endIndex};
int32_t code = ths->pFsm->FpSnapshotStartRead(ths->pFsm, &readerParam, &pReader);
ASSERT(code == 0);
#if 0
if (pMsg->privateTerm < pSender->privateTerm) {
ASSERT(pReader != NULL);
snapshotSenderStart(pSender, readerParam, snapshot, pReader);
} else {
if (pReader != NULL) {
ths->pFsm->FpSnapshotStopRead(ths->pFsm, pReader);
}
}
#endif
}
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
int32_t ret = 0; int32_t ret = 0;
SyncAppendEntriesReply* pMsg = pRpcMsg->pCont; SyncAppendEntriesReply* pMsg = pRpcMsg->pCont;
......
...@@ -197,7 +197,12 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr ...@@ -197,7 +197,12 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
syncMeta.isWeek = pEntry->isWeak; syncMeta.isWeek = pEntry->isWeak;
syncMeta.seqNum = pEntry->seqNum; syncMeta.seqNum = pEntry->seqNum;
syncMeta.term = pEntry->term; syncMeta.term = pEntry->term;
int64_t tsWriteBegin = taosGetTimestampMs();
index = walAppendLog(pWal, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen); index = walAppendLog(pWal, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen);
int64_t tsWriteEnd = taosGetTimestampMs();
int64_t tsElapsed = tsWriteEnd - tsWriteBegin;
if (index < 0) { if (index < 0) {
int32_t err = terrno; int32_t err = terrno;
const char* errStr = tstrerror(err); const char* errStr = tstrerror(err);
...@@ -210,8 +215,8 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr ...@@ -210,8 +215,8 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
} }
pEntry->index = index; pEntry->index = index;
sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s", pEntry->index, sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s, elapsed:%" PRId64, pEntry->index,
TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType)); TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType), tsElapsed);
return 0; return 0;
} }
...@@ -236,7 +241,11 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR ...@@ -236,7 +241,11 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR
taosThreadMutexLock(&(pData->mutex)); taosThreadMutexLock(&(pData->mutex));
int64_t tsBegin = taosGetTimestampMs();
code = walReadVer(pWalHandle, index); code = walReadVer(pWalHandle, index);
int64_t tsEnd = taosGetTimestampMs();
int64_t tsElapsed = tsEnd - tsBegin;
// code = walReadVerCached(pWalHandle, index); // code = walReadVerCached(pWalHandle, index);
if (code != 0) { if (code != 0) {
int32_t err = terrno; int32_t err = terrno;
......
...@@ -140,9 +140,10 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode) { ...@@ -140,9 +140,10 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode) {
int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg) { int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg) {
int32_t ret = 0; int32_t ret = 0;
SyncAppendEntries* pMsg = pRpcMsg->pCont; SyncAppendEntries* pMsg = pRpcMsg->pCont;
if (pMsg == NULL) {
syncLogSendAppendEntries(pSyncNode, pMsg, ""); sError("vgId:%d, sync-append-entries msg is NULL", pSyncNode->vgId);
syncNodeSendMsgById(destRaftId, pSyncNode, pRpcMsg); return 0;
}
SPeerState* pState = syncNodeGetPeerState(pSyncNode, destRaftId); SPeerState* pState = syncNodeGetPeerState(pSyncNode, destRaftId);
if (pState == NULL) { if (pState == NULL) {
...@@ -150,8 +151,19 @@ int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftI ...@@ -150,8 +151,19 @@ int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftI
return 0; return 0;
} }
// save index, otherwise pMsg will be free by rpc
SyncIndex saveLastSendIndex = pState->lastSendIndex;
bool update = false;
if (pMsg->dataLen > 0) { if (pMsg->dataLen > 0) {
pState->lastSendIndex = pMsg->prevLogIndex + 1; saveLastSendIndex = pMsg->prevLogIndex + 1;
update = true;
}
syncLogSendAppendEntries(pSyncNode, pMsg, "");
syncNodeSendMsgById(destRaftId, pSyncNode, pRpcMsg);
if (update) {
pState->lastSendIndex = saveLastSendIndex;
pState->lastSendTime = taosGetTimestampMs(); pState->lastSendTime = taosGetTimestampMs();
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册