未验证 提交 7722f880 编写于 作者: L Li Minghao 提交者: GitHub

Merge pull request #15861 from taosdata/feature/3.0_mhli

refactor(sync): speed up replicate
......@@ -26,7 +26,10 @@ extern "C" {
extern bool gRaftDetailLog;
#define SYNC_RESP_TTL_MS 10000000
#define SYNC_RESP_TTL_MS 10000000
#define SYNC_SPEED_UP_HB_TIMER 400
#define SYNC_SPEED_UP_AFTER_MS (1000 * 20)
#define SYNC_SLOW_DOWN_RANGE 100
#define SYNC_MAX_BATCH_SIZE 1
#define SYNC_INDEX_BEGIN 0
......@@ -205,6 +208,7 @@ int32_t syncSetStandby(int64_t rid);
ESyncState syncGetMyRole(int64_t rid);
bool syncIsReady(int64_t rid);
const char* syncGetMyRoleStr(int64_t rid);
bool syncRestoreFinish(int64_t rid);
SyncTerm syncGetMyTerm(int64_t rid);
SyncGroupId syncGetVgId(int64_t rid);
void syncGetEpSet(int64_t rid, SEpSet* pEpSet);
......
......@@ -359,11 +359,11 @@ typedef enum ELogicConditionType {
#define TSDB_DEFAULT_DB_SCHEMALESS TSDB_DB_SCHEMALESS_OFF
#define TSDB_DB_MIN_WAL_RETENTION_PERIOD -1
#define TSDB_DEFAULT_DB_WAL_RETENTION_PERIOD 0
#define TSDB_DEFAULT_DB_WAL_RETENTION_PERIOD (24 * 60 * 60 * 2)
#define TSDB_DB_MIN_WAL_RETENTION_SIZE -1
#define TSDB_DEFAULT_DB_WAL_RETENTION_SIZE 0
#define TSDB_DEFAULT_DB_WAL_RETENTION_SIZE -1
#define TSDB_DB_MIN_WAL_ROLL_PERIOD 0
#define TSDB_DEFAULT_DB_WAL_ROLL_PERIOD 0
#define TSDB_DEFAULT_DB_WAL_ROLL_PERIOD (24 * 60 * 60 * 1)
#define TSDB_DB_MIN_WAL_SEGMENT_SIZE 0
#define TSDB_DEFAULT_DB_WAL_SEGMENT_SIZE 0
......
......@@ -730,7 +730,8 @@ void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
bool vnodeIsLeader(SVnode *pVnode) {
if (!syncIsReady(pVnode->sync)) {
vDebug("vgId:%d, vnode not ready", pVnode->config.vgId);
vDebug("vgId:%d, vnode not ready, state:%s, restore:%d", pVnode->config.vgId, syncGetMyRoleStr(pVnode->sync),
syncRestoreFinish(pVnode->sync));
return false;
}
......
......@@ -162,6 +162,9 @@ typedef struct SSyncNode {
// is config changing
bool changing;
int64_t startTime;
int64_t lastReplicateTime;
} SSyncNode;
// open/close --------------
......@@ -186,16 +189,18 @@ int32_t syncNodePingAll(SSyncNode* pSyncNode);
// timer control --------------
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode);
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms);
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode);
int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms);
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode);
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeStartNowHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeStartHeartbeatTimerNow(SSyncNode* pSyncNode);
int32_t syncNodeStartHeartbeatTimerMS(SSyncNode* pSyncNode, int32_t ms);
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeRestartNowHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeRestartHeartbeatTimerNow(SSyncNode* pSyncNode);
int32_t syncNodeRestartNowHeartbeatTimerMS(SSyncNode* pSyncNode, int32_t ms);
// utils --------------
......
......@@ -55,7 +55,7 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode);
int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode);
int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode);
int32_t syncNodeReplicate(SSyncNode* pSyncNode);
int32_t syncNodeReplicate(SSyncNode* pSyncNode, bool isTimer);
int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg);
int32_t syncNodeAppendEntriesBatch(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntriesBatch* pMsg);
......
......@@ -495,6 +495,18 @@ const char* syncGetMyRoleStr(int64_t rid) {
return s;
}
bool syncRestoreFinish(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
return false;
}
ASSERT(rid == pSyncNode->rid);
bool restoreFinish = pSyncNode->restoreFinish;
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return restoreFinish;
}
SyncTerm syncGetMyTerm(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
......@@ -1086,6 +1098,10 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
// start raft
// syncNodeBecomeFollower(pSyncNode);
int64_t timeNow = taosGetTimestampMs();
pSyncNode->startTime = timeNow;
pSyncNode->lastReplicateTime = timeNow;
syncNodeEventLog(pSyncNode, "sync open");
return pSyncNode;
......@@ -1303,7 +1319,7 @@ int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) {
return ret;
}
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
int32_t ret = 0;
if (syncEnvIsStart()) {
taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager,
......@@ -1322,26 +1338,21 @@ int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
return ret;
}
int32_t syncNodeStartHeartbeatTimerMS(SSyncNode* pSyncNode, int32_t ms) {
int32_t ret = 0;
if (syncEnvIsStart()) {
taosTmrReset(pSyncNode->FpHeartbeatTimerCB, ms, pSyncNode, gSyncEnv->pTimerManager, &pSyncNode->pHeartbeatTimer);
atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
} else {
sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
}
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "start heartbeat timer, ms:%d", ms);
syncNodeEventLog(pSyncNode, logBuf);
} while (0);
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
int32_t ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
return ret;
}
int32_t syncNodeStartHeartbeatTimerMS(SSyncNode* pSyncNode, int32_t ms) {
pSyncNode->heartbeatTimerMS = ms;
int32_t ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
return ret;
}
int32_t syncNodeStartNowHeartbeatTimer(SSyncNode* pSyncNode) {
int32_t ret = syncNodeStartHeartbeatTimerMS(pSyncNode, 1);
int32_t syncNodeStartHeartbeatTimerNow(SSyncNode* pSyncNode) {
pSyncNode->heartbeatTimerMS = 1;
int32_t ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
return ret;
}
......@@ -1362,9 +1373,9 @@ int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
return 0;
}
int32_t syncNodeRestartNowHeartbeatTimer(SSyncNode* pSyncNode) {
int32_t syncNodeRestartHeartbeatTimerNow(SSyncNode* pSyncNode) {
syncNodeStopHeartbeatTimer(pSyncNode);
syncNodeStartNowHeartbeatTimer(pSyncNode);
syncNodeStartHeartbeatTimerNow(pSyncNode);
return 0;
}
......@@ -1942,9 +1953,6 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
// Raft 3.6.2 Committing entries from previous terms
syncNodeAppendNoop(pSyncNode);
#if 0 // simon
syncNodeReplicate(pSyncNode);
#endif
syncMaybeAdvanceCommitIndex(pSyncNode);
} else {
......@@ -2126,9 +2134,6 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
// Raft 3.6.2 Committing entries from previous terms
syncNodeAppendNoop(pSyncNode);
#if 0 // simon
syncNodeReplicate(pSyncNode);
#endif
syncMaybeAdvanceCommitIndex(pSyncNode);
}
......@@ -2499,7 +2504,7 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) {
if (ths->state == TAOS_SYNC_STATE_LEADER) {
int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
ASSERT(code == 0);
syncNodeReplicate(ths);
syncNodeReplicate(ths, false);
}
syncEntryDestory(pEntry);
......@@ -2572,7 +2577,7 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI
// if mulit replica, start replicate right now
if (ths->replicaNum > 1) {
syncNodeReplicate(ths);
syncNodeReplicate(ths, false);
// pre commit
syncNodePreCommit(ths, pEntry, 0);
......@@ -2641,7 +2646,7 @@ int32_t syncNodeOnClientRequestBatchCb(SSyncNode* ths, SyncClientRequestBatch* p
if (ths->replicaNum > 1) {
// if multi replica, start replicate right now
syncNodeReplicate(ths);
syncNodeReplicate(ths, false);
} else if (ths->replicaNum == 1) {
// one replica
......
......@@ -202,17 +202,19 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) {
syncAppendEntriesBatchDestroy(pMsg);
// speed up
if (pMsg->dataCount > 0 && pMsg->prevLogIndex < pSyncNode->commitIndex) {
if (pMsg->dataCount > 0 && pSyncNode->commitIndex - pMsg->prevLogIndex > SYNC_SLOW_DOWN_RANGE) {
ret = 1;
#if 0
do {
char logBuf[128];
char host[64];
uint16_t port;
syncUtilU642Addr(pDestId->addr, host, sizeof(host), &port);
snprintf(logBuf, sizeof(logBuf), "speed up for %s:%d, pre-index:%ld", host, port, pMsg->prevLogIndex);
snprintf(logBuf, sizeof(logBuf), "maybe speed up for %s:%d, pre-index:%ld", host, port, pMsg->prevLogIndex);
syncNodeEventLog(pSyncNode, logBuf);
} while (0);
#endif
}
}
......@@ -301,7 +303,7 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
return ret;
}
int32_t syncNodeReplicate(SSyncNode* pSyncNode) {
int32_t syncNodeReplicate(SSyncNode* pSyncNode, bool isTimer) {
// start replicate
int32_t ret = 0;
......@@ -323,13 +325,38 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode) {
break;
}
if (ret > 0) {
// start delay
int64_t timeNow = taosGetTimestampMs();
int64_t startDelay = timeNow - pSyncNode->startTime;
// replicate delay
int64_t replicateDelay = timeNow - pSyncNode->lastReplicateTime;
pSyncNode->lastReplicateTime = timeNow;
if (ret > 0 && isTimer && startDelay > SYNC_SPEED_UP_AFTER_MS) {
// speed up replicate
int32_t ms = pSyncNode->heartbeatTimerMS < 50 ? pSyncNode->heartbeatTimerMS : 50;
int32_t ms =
pSyncNode->heartbeatTimerMS < SYNC_SPEED_UP_HB_TIMER ? pSyncNode->heartbeatTimerMS : SYNC_SPEED_UP_HB_TIMER;
syncNodeRestartNowHeartbeatTimerMS(pSyncNode, ms);
#if 0
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "replicate speed up");
syncNodeEventLog(pSyncNode, logBuf);
} while (0);
#endif
} else {
syncNodeRestartHeartbeatTimer(pSyncNode);
#if 0
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "replicate slow down");
syncNodeEventLog(pSyncNode, logBuf);
} while (0);
#endif
}
return ret;
......
......@@ -58,7 +58,7 @@ int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) {
++(ths->heartbeatTimerCounter);
sInfo("vgId:%d, sync timeout, type:replicate count:%d, heartbeatTimerLogicClockUser:%ld", ths->vgId,
ths->heartbeatTimerCounter, ths->heartbeatTimerLogicClockUser);
syncNodeReplicate(ths);
syncNodeReplicate(ths, true);
}
} else {
sError("vgId:%d, unknown timeout-type:%d", ths->vgId, pMsg->timeoutType);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册