diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 47eab279a5cc94ef1b2b6d0f34a2f2e48978e9af..5a1acb47314f91d2ca4f2e247b997189da25ee56 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -89,7 +89,7 @@ int32_t syncStart(int64_t rid) { } if (syncNodeRestore(pSyncNode) < 0) { - sError("vgId:%d, failed to restore raft log buffer since %s", pSyncNode->vgId, terrstr()); + sError("vgId:%d, failed to restore sync log buffer since %s", pSyncNode->vgId, terrstr()); goto _err; } @@ -847,7 +847,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { // create raft log ring buffer pSyncNode->pLogBuf = syncLogBufferCreate(); if (pSyncNode->pLogBuf == NULL) { - sError("failed to init log buffer since %s. vgId:%d", terrstr(), pSyncNode->vgId); + sError("failed to init sync log buffer since %s. vgId:%d", terrstr(), pSyncNode->vgId); goto _error; } @@ -1060,7 +1060,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { // init log buffer if (syncLogBufferInit(pSyncNode->pLogBuf, pSyncNode) < 0) { - sError("vgId:%d, failed to init raft log buffer since %s", pSyncNode->vgId, terrstr()); + sError("vgId:%d, failed to init sync log buffer since %s", pSyncNode->vgId, terrstr()); goto _error; } @@ -2239,7 +2239,7 @@ int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHand int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) { // append to log buffer if (syncLogBufferAppend(ths->pLogBuf, ths, pEntry) < 0) { - sError("vgId:%d, failed to enqueue log buffer. index:%" PRId64 "", ths->vgId, pEntry->index); + sError("vgId:%d, failed to enqueue sync log buffer. index:%" PRId64 "", ths->vgId, pEntry->index); return -1; } diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index f1d401759b94c7b6bcc1db606ee8020bfad8019b..cfea6e8bc6e563ffefd6e4ac2b93bee61f6c1616 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -38,7 +38,7 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt SyncIndex index = pEntry->index; if (index - pBuf->startIndex >= pBuf->size) { - sError("vgId:%d, failed to append due to log buffer full. index:%" PRId64 "", pNode->vgId, index); + sError("vgId:%d, failed to append due to sync log buffer full. index:%" PRId64 "", pNode->vgId, index); goto _out; } @@ -49,7 +49,7 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt // initial log buffer with at least one item, e.g. commitIndex SSyncRaftEntry* pMatch = pBuf->entries[(index - 1 + pBuf->size) % pBuf->size].pItem; - ASSERT(pMatch != NULL && "no matched raft log entry"); + ASSERT(pMatch != NULL && "no matched log entry"); ASSERT(pMatch->index + 1 == index); SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = pMatch->index, .prevLogTerm = pMatch->term}; @@ -273,8 +273,8 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt SyncTerm lastMatchTerm = syncLogBufferGetLastMatchTerm(pBuf); if (index <= pBuf->commitIndex) { - sTrace("vgId:%d, raft entry already committed. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64 - " %" PRId64 " %" PRId64 ", %" PRId64 ")", + sTrace("vgId:%d, already committed. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64 " %" PRId64 + " %" PRId64 ", %" PRId64 ")", pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); SyncTerm term = syncLogReplMgrGetPrevLogTerm(NULL, pNode, index + 1); @@ -286,15 +286,15 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt } if (index - pBuf->startIndex >= pBuf->size) { - sInfo("vgId:%d, raft entry out of buffer capacity. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64 - " %" PRId64 " %" PRId64 ", %" PRId64 ")", + sInfo("vgId:%d, out of buffer range. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64 " %" PRId64 + " %" PRId64 ", %" PRId64 ")", pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); goto _out; } if (index > pBuf->matchIndex && lastMatchTerm != prevTerm) { - sInfo("vgId:%d, not ready to accept raft entry. index: %" PRId64 ", term: %" PRId64 ": prevterm: %" PRId64 + sInfo("vgId:%d, not ready to accept. index: %" PRId64 ", term: %" PRId64 ": prevterm: %" PRId64 " != lastmatch: %" PRId64 ". log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, pEntry->index, pEntry->term, prevTerm, lastMatchTerm, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); @@ -309,7 +309,7 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt if (pEntry->term != pExist->term) { (void)syncLogBufferRollback(pBuf, index); } else { - sDebug("vgId:%d, duplicate raft entry received. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64 + sDebug("vgId:%d, duplicate log entry received. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); @@ -349,7 +349,7 @@ int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { ASSERT(pEntry->index == lastVer + 1); if (pLogStore->syncLogAppendEntry(pLogStore, pEntry) < 0) { - sError("failed to append raft log entry since %s. index:%" PRId64 ", term:%" PRId64 "", terrstr(), pEntry->index, + sError("failed to append sync log entry since %s. index:%" PRId64 ", term:%" PRId64 "", terrstr(), pEntry->index, pEntry->term); return -1; } @@ -392,7 +392,7 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* p if (pMatch->term != prevLogTerm) { sInfo( - "vgId:%d, mismatching raft log entries encountered. " + "vgId:%d, mismatching sync log entries encountered. " "{ index:%" PRId64 ", term:%" PRId64 " } " "{ index:%" PRId64 ", term:%" PRId64 ", prevLogIndex:%" PRId64 ", prevLogTerm:%" PRId64 " } ", @@ -411,8 +411,8 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* p // persist if (syncLogStorePersist(pLogStore, pEntry) < 0) { - sError("vgId:%d, failed to persist raft log entry from log buffer since %s. index:%" PRId64, pNode->vgId, - terrstr(), pEntry->index); + sError("vgId:%d, failed to persist sync log entry from buffer since %s. index:%" PRId64, pNode->vgId, terrstr(), + pEntry->index); goto _out; } ASSERT(pEntry->index == pBuf->matchIndex); @@ -482,15 +482,14 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm bool inBuf = false; if (commitIndex <= pBuf->commitIndex) { - sDebug("vgId:%d, stale commit update. current:%" PRId64 ", notified:%" PRId64 "", vgId, pBuf->commitIndex, + sDebug("vgId:%d, stale commit index. current:%" PRId64 ", notified:%" PRId64 "", vgId, pBuf->commitIndex, commitIndex); ret = 0; goto _out; } - sTrace("vgId:%d, log buffer info. role: %d, term: %" PRId64 ". start index:%" PRId64 ", commit index:%" PRId64 - ", match index: %" PRId64 ", end index:%" PRId64 "", - pNode->vgId, role, term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); + sTrace("vgId:%d, commit. log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 "), role: %d, term: %" PRId64, + pNode->vgId, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex, role, term); // execute in fsm for (int64_t index = pBuf->commitIndex + 1; index <= upperIndex; index++) { @@ -513,7 +512,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm } if (syncLogFsmExecute(pNode, pFsm, role, term, pEntry) != 0) { - sError("vgId:%d, failed to execute raft entry in FSM. log index:%" PRId64 ", term:%" PRId64 "", vgId, + sError("vgId:%d, failed to execute sync log entry in FSM. log index:%" PRId64 ", term:%" PRId64 "", vgId, pEntry->index, pEntry->term); goto _out; } @@ -605,7 +604,7 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { bool barrier = false; if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { - sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: %" PRIx64 "", pNode->vgId, + sError("vgId:%d, failed to replicate sync log entry since %s. index: %" PRId64 ", dest: %" PRIx64 "", pNode->vgId, terrstr(), index, pDestId->addr); goto _out; } @@ -622,9 +621,8 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { _out: if (retried) { pMgr->retryBackoff = syncLogGetNextRetryBackoff(pMgr); - sInfo("vgId:%d, resend %d raft log entries. dest: %" PRIx64 ", for indexes: %" PRId64 - " etc., maybe of term: %" PRId64 ", retryWaitMs: %" PRId64 ", repl mgr: [%" PRId64 " %" PRId64 ", %" PRId64 - ")", + sInfo("vgId:%d, resend %d sync log entries. dest: %" PRIx64 ", indexes: %" PRId64 " ..., likely term: %" PRId64 + ", retryWaitMs: %" PRId64 ", repl mgr: [%" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, count, pDestId->addr, firstIndex, term, retryWaitMs, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex); } @@ -645,8 +643,8 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod ASSERT(pMgr->matchIndex == 0); if (pMsg->matchIndex < 0) { pMgr->restored = true; - sInfo("vgId:%d, sync log repl mgr restored. peer: %s:%d (%" PRIx64 "), repl mgr(rs:%d): [%" PRId64 " %" PRId64 - ", %" PRId64 "), log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", + sInfo("vgId:%d, sync log repl mgr restored. peer: %s:%d (%" PRIx64 "), mgr: rs(%d) [%" PRId64 " %" PRId64 + ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, host, port, destId.addr, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); return 0; @@ -661,21 +659,21 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod if (pMsg->success && pMsg->matchIndex == pMsg->lastSendIndex) { pMgr->restored = true; - sInfo("vgId:%d, sync log repl mgr restored. peer: %s:%d (%" PRIx64 "), repl mgr(rs:%d): [%" PRId64 " %" PRId64 - ", %" PRId64 "), log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", + sInfo("vgId:%d, sync log repl mgr restored. peer: %s:%d (%" PRIx64 "), mgr: rs(%d) [%" PRId64 " %" PRId64 + ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, host, port, destId.addr, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); return 0; } if (pMsg->success == false && pMsg->matchIndex >= pMsg->lastSendIndex) { - sError("vgId:%d, failed to rollback match index. peer: %s:%d, match index: %" PRId64 ", last sent: %" PRId64, pNode->vgId, - host, port, pMsg->matchIndex, pMsg->lastSendIndex); + sError("vgId:%d, failed to rollback match index. peer: %s:%d, match index: %" PRId64 ", last sent: %" PRId64, + pNode->vgId, host, port, pMsg->matchIndex, pMsg->lastSendIndex); if (syncNodeStartSnapshot(pNode, &destId) < 0) { sError("vgId:%d, failed to start snapshot for peer %s:%d", pNode->vgId, host, port); return -1; } - sInfo("vgId:%d, snapshot replication to rollback. peer: %s:%d", pNode->vgId, host, port); + sInfo("vgId:%d, snapshot replication to peer %s:%d", pNode->vgId, host, port); return 0; } @@ -748,7 +746,8 @@ int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sync SSyncLogBuffer* pBuf = pNode->pLogBuf; taosThreadMutexLock(&pBuf->mutex); if (pMsg->startTime != pMgr->peerStartTime) { - sInfo("vgId:%d, reset sync log repl mgr in append entries reply. peer: %" PRIx64 ", start time:%" PRId64 ", old:%" PRId64, + sInfo("vgId:%d, reset sync log repl mgr in appendlog reply. peer: %" PRIx64 ", start time:%" PRId64 + ", old:%" PRId64, pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime); syncLogReplMgrReset(pMgr); pMgr->peerStartTime = pMsg->startTime; @@ -793,8 +792,6 @@ int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode return 0; } -_Atomic int64_t tsSendCnt = 0; - int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { ASSERT(pMgr->restored); SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; @@ -825,7 +822,6 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p pMgr->states[pos].acked = false; pMgr->endIndex = index + 1; - tsSendCnt++; if (barrier) { break; } @@ -957,7 +953,7 @@ int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) { (void)syncLogBufferRollback(pBuf, pBuf->matchIndex + 1); - sInfo("vgId:%d, reset sync log buffer. log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, + sInfo("vgId:%d, reset sync log buffer. buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); pBuf->endIndex = pBuf->matchIndex + 1; @@ -1003,7 +999,7 @@ int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Syn if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) { SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(pNode, pDestId); if (pMgr) { - sInfo("vgId:%d, reset sync log repl mgr for peer: 0x%016" PRIx64 " since %s. index: %" PRId64, pNode->vgId, + sInfo("vgId:%d, reset sync log repl mgr of peer: %" PRIx64 " since %s. index: %" PRId64, pNode->vgId, pDestId->addr, terrstr(), index); (void)syncLogReplMgrReset(pMgr); }