diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 94c4de3c5e75da0ae0ce1e87521bd85a95482884..875204c0a9799386c069872b6d9a9aace85fb230 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -37,43 +37,6 @@ // /\ UNCHANGED <> // -// only start once -static void syncNodeStartSnapshotOnce(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, SyncTerm lastApplyTerm, - SyncAppendEntriesReply* pMsg) { - if (beginIndex > endIndex) { - sNError(ths, "snapshot param error, start:%" PRId64 ", end:%" PRId64, beginIndex, endIndex); - return; - } - - // get sender - SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId)); - ASSERT(pSender != NULL); - - if (snapshotSenderIsStart(pSender)) { - sSError(pSender, "snapshot sender already start"); - return; - } - - SSnapshot snapshot = { - .data = NULL, .lastApplyIndex = endIndex, .lastApplyTerm = lastApplyTerm, .lastConfigIndex = SYNC_INDEX_INVALID}; - void* pReader = NULL; - SSnapshotParam readerParam = {.start = beginIndex, .end = endIndex}; - int32_t code = ths->pFsm->FpSnapshotStartRead(ths->pFsm, &readerParam, &pReader); - ASSERT(code == 0); - -#if 0 - if (pMsg->privateTerm < pSender->privateTerm) { - ASSERT(pReader != NULL); - snapshotSenderStart(pSender, readerParam, snapshot, pReader); - - } else { - if (pReader != NULL) { - ths->pFsm->FpSnapshotStopRead(ths->pFsm, pReader); - } - } -#endif -} - int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { int32_t ret = 0; SyncAppendEntriesReply* pMsg = pRpcMsg->pCont; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 5ceca36aaca602e5fee01f6d040316d48c86e562..8bccf6840a37d4fd8c4a6fb42f02fed24753358a 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -385,7 +385,7 @@ bool syncIsReadyForRead(int64_t rid) { if (!pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore)) { SSyncRaftEntry* pEntry = NULL; int32_t code = pSyncNode->pLogStore->syncLogGetEntry( - pSyncNode->pLogStore, pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore), &pEntry); + pSyncNode->pLogStore, pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore), &pEntry); if (code == 0 && pEntry != NULL) { if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->pRaftStore->currentTerm) { ready = true; diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 7e4b18ab886fd8993a79ae06f9dab16d5b64d1cb..bc07781e5c2577668c58b246f8157dcceb36c95b 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -197,7 +197,12 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr syncMeta.isWeek = pEntry->isWeak; syncMeta.seqNum = pEntry->seqNum; syncMeta.term = pEntry->term; + + int64_t tsWriteBegin = taosGetTimestampMs(); index = walAppendLog(pWal, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen); + int64_t tsWriteEnd = taosGetTimestampMs(); + int64_t tsElapsed = tsWriteEnd - tsWriteBegin; + if (index < 0) { int32_t err = terrno; const char* errStr = tstrerror(err); @@ -210,8 +215,8 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr } pEntry->index = index; - sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s", pEntry->index, - TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType)); + sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s, elapsed:%" PRId64, pEntry->index, + TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType), tsElapsed); return 0; } @@ -236,7 +241,11 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR taosThreadMutexLock(&(pData->mutex)); + int64_t tsBegin = taosGetTimestampMs(); code = walReadVer(pWalHandle, index); + int64_t tsEnd = taosGetTimestampMs(); + int64_t tsElapsed = tsEnd - tsBegin; + // code = walReadVerCached(pWalHandle, index); if (code != 0) { int32_t err = terrno; diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index f09bac21396cd0bb4a56c0fbac46176f605ea457..59afe814ebcae0b6ad029201a5100f7120c3c9c5 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -140,9 +140,10 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode) { int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg) { int32_t ret = 0; SyncAppendEntries* pMsg = pRpcMsg->pCont; - - syncLogSendAppendEntries(pSyncNode, pMsg, ""); - syncNodeSendMsgById(destRaftId, pSyncNode, pRpcMsg); + if (pMsg == NULL) { + sError("vgId:%d, sync-append-entries msg is NULL", pSyncNode->vgId); + return 0; + } SPeerState* pState = syncNodeGetPeerState(pSyncNode, destRaftId); if (pState == NULL) { @@ -150,8 +151,19 @@ int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftI return 0; } + // save index, otherwise pMsg will be free by rpc + SyncIndex saveLastSendIndex = pState->lastSendIndex; + bool update = false; if (pMsg->dataLen > 0) { - pState->lastSendIndex = pMsg->prevLogIndex + 1; + saveLastSendIndex = pMsg->prevLogIndex + 1; + update = true; + } + + syncLogSendAppendEntries(pSyncNode, pMsg, ""); + syncNodeSendMsgById(destRaftId, pSyncNode, pRpcMsg); + + if (update) { + pState->lastSendIndex = saveLastSendIndex; pState->lastSendTime = taosGetTimestampMs(); }