未验证 提交 4c775b11 编写于 作者: L Li Minghao 提交者: GitHub

Merge pull request #15843 from taosdata/feature/3.0_mhli

refactor(sync): speed up replicate
......@@ -192,9 +192,11 @@ int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms);
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode);
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeStartNowHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeStartHeartbeatTimerMS(SSyncNode* pSyncNode, int32_t ms);
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeRestartNowHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeRestartNowHeartbeatTimerMS(SSyncNode* pSyncNode, int32_t ms);
// utils --------------
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg);
......
......@@ -1322,10 +1322,10 @@ int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
return ret;
}
int32_t syncNodeStartNowHeartbeatTimer(SSyncNode* pSyncNode) {
int32_t syncNodeStartHeartbeatTimerMS(SSyncNode* pSyncNode, int32_t ms) {
int32_t ret = 0;
if (syncEnvIsStart()) {
taosTmrReset(pSyncNode->FpHeartbeatTimerCB, 1, pSyncNode, gSyncEnv->pTimerManager, &pSyncNode->pHeartbeatTimer);
taosTmrReset(pSyncNode->FpHeartbeatTimerCB, ms, pSyncNode, gSyncEnv->pTimerManager, &pSyncNode->pHeartbeatTimer);
atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
} else {
sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
......@@ -1333,13 +1333,18 @@ int32_t syncNodeStartNowHeartbeatTimer(SSyncNode* pSyncNode) {
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "start heartbeat timer, ms:%d", 1);
snprintf(logBuf, sizeof(logBuf), "start heartbeat timer, ms:%d", ms);
syncNodeEventLog(pSyncNode, logBuf);
} while (0);
return ret;
}
int32_t syncNodeStartNowHeartbeatTimer(SSyncNode* pSyncNode) {
int32_t ret = syncNodeStartHeartbeatTimerMS(pSyncNode, 1);
return ret;
}
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
int32_t ret = 0;
atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1);
......@@ -1363,6 +1368,12 @@ int32_t syncNodeRestartNowHeartbeatTimer(SSyncNode* pSyncNode) {
return 0;
}
int32_t syncNodeRestartNowHeartbeatTimerMS(SSyncNode* pSyncNode, int32_t ms) {
syncNodeStopHeartbeatTimer(pSyncNode);
syncNodeStartHeartbeatTimerMS(pSyncNode, ms);
return 0;
}
// utils --------------
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
SEpSet epSet;
......
......@@ -200,9 +200,23 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) {
// send msg
syncNodeAppendEntriesBatch(pSyncNode, pDestId, pMsg);
syncAppendEntriesBatchDestroy(pMsg);
// speed up
if (pMsg->dataCount > 0 && pMsg->prevLogIndex < pSyncNode->commitIndex) {
ret = 1;
do {
char logBuf[128];
char host[64];
uint16_t port;
syncUtilU642Addr(pDestId->addr, host, sizeof(host), &port);
snprintf(logBuf, sizeof(logBuf), "speed up for %s:%d, pre-index:%ld", host, port, pMsg->prevLogIndex);
syncNodeEventLog(pSyncNode, logBuf);
} while (0);
}
}
return 0;
return ret;
}
int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
......@@ -309,7 +323,14 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode) {
break;
}
syncNodeRestartHeartbeatTimer(pSyncNode);
if (ret > 0) {
// speed up replicate
int32_t ms = pSyncNode->heartbeatTimerMS < 50 ? pSyncNode->heartbeatTimerMS : 50;
syncNodeRestartNowHeartbeatTimerMS(pSyncNode, ms);
} else {
syncNodeRestartHeartbeatTimer(pSyncNode);
}
return ret;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册