未验证 提交 d192abbb 编写于 作者: L Li Minghao 提交者: GitHub

Merge pull request #14814 from taosdata/feature/3.0_mhli

refactor(sync): do leader transfer
...@@ -199,6 +199,7 @@ int32_t mndInitSync(SMnode *pMnode) { ...@@ -199,6 +199,7 @@ int32_t mndInitSync(SMnode *pMnode) {
} }
// decrease election timer // decrease election timer
setPingTimerMS(pMgmt->sync, 5000);
setElectTimerMS(pMgmt->sync, 600); setElectTimerMS(pMgmt->sync, 600);
setHeartbeatTimerMS(pMgmt->sync, 300); setHeartbeatTimerMS(pMgmt->sync, 300);
......
...@@ -569,7 +569,7 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { ...@@ -569,7 +569,7 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
return -1; return -1;
} }
setPingTimerMS(pVnode->sync, 3000); setPingTimerMS(pVnode->sync, 5000);
setElectTimerMS(pVnode->sync, 500); setElectTimerMS(pVnode->sync, 500);
setHeartbeatTimerMS(pVnode->sync, 100); setHeartbeatTimerMS(pVnode->sync, 100);
return 0; return 0;
......
...@@ -30,7 +30,7 @@ extern "C" { ...@@ -30,7 +30,7 @@ extern "C" {
#define TIMER_MAX_MS 0x7FFFFFFF #define TIMER_MAX_MS 0x7FFFFFFF
#define ENV_TICK_TIMER_MS 1000 #define ENV_TICK_TIMER_MS 1000
#define PING_TIMER_MS 1000 #define PING_TIMER_MS 5000
#define ELECT_TIMER_MS_MIN 1300 #define ELECT_TIMER_MS_MIN 1300
#define ELECT_TIMER_MS_MAX (ELECT_TIMER_MS_MIN * 2) #define ELECT_TIMER_MS_MAX (ELECT_TIMER_MS_MIN * 2)
#define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN) #define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN)
......
...@@ -273,16 +273,8 @@ int32_t syncLeaderTransfer(int64_t rid) { ...@@ -273,16 +273,8 @@ int32_t syncLeaderTransfer(int64_t rid) {
} }
ASSERT(rid == pSyncNode->rid); ASSERT(rid == pSyncNode->rid);
if (pSyncNode->peersNum == 0) { int32_t ret = syncNodeLeaderTransfer(pSyncNode);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
taosReleaseRef(tsNodeRefId, pSyncNode->rid); taosReleaseRef(tsNodeRefId, pSyncNode->rid);
int32_t ret = syncLeaderTransferTo(rid, newLeader);
return ret; return ret;
} }
...@@ -293,25 +285,8 @@ int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) { ...@@ -293,25 +285,8 @@ int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) {
return -1; return -1;
} }
ASSERT(rid == pSyncNode->rid); ASSERT(rid == pSyncNode->rid);
int32_t ret = 0;
if (pSyncNode->replicaNum == 1) {
sError("only one replica, cannot drop leader");
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
terrno = TSDB_CODE_SYN_ONE_REPLICA;
return -1;
}
SyncLeaderTransfer* pMsg = syncLeaderTransferBuild(pSyncNode->vgId);
pMsg->newLeaderId.addr = syncUtilAddr2U64(newLeader.nodeFqdn, newLeader.nodePort);
pMsg->newLeaderId.vgId = pSyncNode->vgId;
pMsg->newNodeInfo = newLeader;
ASSERT(pMsg != NULL);
SRpcMsg rpcMsg = {0};
syncLeaderTransfer2RpcMsg(pMsg, &rpcMsg);
syncLeaderTransferDestroy(pMsg);
ret = syncNodePropose(pSyncNode, &rpcMsg, false); int32_t ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
taosReleaseRef(tsNodeRefId, pSyncNode->rid); taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return ret; return ret;
} }
...@@ -337,6 +312,12 @@ int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) { ...@@ -337,6 +312,12 @@ int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
return -1; return -1;
} }
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort);
syncNodeEventLog(pSyncNode, logBuf);
} while (0);
SyncLeaderTransfer* pMsg = syncLeaderTransferBuild(pSyncNode->vgId); SyncLeaderTransfer* pMsg = syncLeaderTransferBuild(pSyncNode->vgId);
pMsg->newLeaderId.addr = syncUtilAddr2U64(newLeader.nodeFqdn, newLeader.nodePort); pMsg->newLeaderId.addr = syncUtilAddr2U64(newLeader.nodeFqdn, newLeader.nodePort);
pMsg->newLeaderId.vgId = pSyncNode->vgId; pMsg->newLeaderId.vgId = pSyncNode->vgId;
...@@ -1118,19 +1099,13 @@ void syncNodeStart(SSyncNode* pSyncNode) { ...@@ -1118,19 +1099,13 @@ void syncNodeStart(SSyncNode* pSyncNode) {
// Raft 3.6.2 Committing entries from previous terms // Raft 3.6.2 Committing entries from previous terms
syncNodeAppendNoop(pSyncNode); syncNodeAppendNoop(pSyncNode);
syncMaybeAdvanceCommitIndex(pSyncNode); syncMaybeAdvanceCommitIndex(pSyncNode);
} else {
return; syncNodeBecomeFollower(pSyncNode, "first start");
} }
syncNodeBecomeFollower(pSyncNode, "first start"); int32_t ret = 0;
ret = syncNodeStartPingTimer(pSyncNode);
// int32_t ret = 0; ASSERT(ret == 0);
// ret = syncNodeStartPingTimer(pSyncNode);
// ASSERT(ret == 0);
if (gRaftDetailLog) {
syncNodeLog2("==state change become leader immediately==", pSyncNode);
}
} }
void syncNodeStartStandBy(SSyncNode* pSyncNode) { void syncNodeStartStandBy(SSyncNode* pSyncNode) {
...@@ -1147,8 +1122,6 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) { ...@@ -1147,8 +1122,6 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) {
void syncNodeClose(SSyncNode* pSyncNode) { void syncNodeClose(SSyncNode* pSyncNode) {
syncNodeEventLog(pSyncNode, "sync close"); syncNodeEventLog(pSyncNode, "sync close");
// leader transfer
int32_t ret; int32_t ret;
ASSERT(pSyncNode != NULL); ASSERT(pSyncNode != NULL);
...@@ -1183,14 +1156,6 @@ void syncNodeClose(SSyncNode* pSyncNode) { ...@@ -1183,14 +1156,6 @@ void syncNodeClose(SSyncNode* pSyncNode) {
pSyncNode->pNewNodeReceiver = NULL; pSyncNode->pNewNodeReceiver = NULL;
} }
/*
if (pSyncNode->pSnapshot != NULL) {
taosMemoryFree(pSyncNode->pSnapshot);
}
*/
// tsem_destroy(&pSyncNode->restoreSem);
// free memory in syncFreeNode // free memory in syncFreeNode
// taosMemoryFree(pSyncNode); // taosMemoryFree(pSyncNode);
} }
...@@ -1255,7 +1220,7 @@ int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { ...@@ -1255,7 +1220,7 @@ int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
&pSyncNode->pPingTimer); &pSyncNode->pPingTimer);
atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser); atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
} else { } else {
sError("sync env is stop, syncNodeStartPingTimer"); sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
} }
return ret; return ret;
} }
...@@ -1276,7 +1241,7 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) { ...@@ -1276,7 +1241,7 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
&pSyncNode->pElectTimer); &pSyncNode->pElectTimer);
atomic_store_64(&pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser); atomic_store_64(&pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser);
} else { } else {
sError("sync env is stop, syncNodeStartElectTimer"); sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
} }
return ret; return ret;
} }
...@@ -1316,7 +1281,7 @@ int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) { ...@@ -1316,7 +1281,7 @@ int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
&pSyncNode->pHeartbeatTimer); &pSyncNode->pHeartbeatTimer);
atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser); atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
} else { } else {
sError("sync env is stop, syncNodeStartHeartbeatTimer"); sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
} }
return ret; return ret;
} }
...@@ -2643,7 +2608,7 @@ const char* syncStr(ESyncState state) { ...@@ -2643,7 +2608,7 @@ const char* syncStr(ESyncState state) {
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) { static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg); SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg);
syncNodeEventLog(ths, "begin leader transfer"); syncNodeEventLog(ths, "do leader transfer");
bool sameId = syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId)); bool sameId = syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId));
bool sameNodeInfo = strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 && bool sameNodeInfo = strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 &&
......
...@@ -17,6 +17,11 @@ ...@@ -17,6 +17,11 @@
#include "syncElection.h" #include "syncElection.h"
#include "syncReplication.h" #include "syncReplication.h"
int32_t syncNodeTimerRoutine(SSyncNode* ths) {
syncNodeEventLog(ths, "timer routines ... ");
return 0;
}
int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) {
int32_t ret = 0; int32_t ret = 0;
syncTimeoutLog2("==syncNodeOnTimeoutCb==", pMsg); syncTimeoutLog2("==syncNodeOnTimeoutCb==", pMsg);
...@@ -24,8 +29,11 @@ int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { ...@@ -24,8 +29,11 @@ int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) {
if (pMsg->timeoutType == SYNC_TIMEOUT_PING) { if (pMsg->timeoutType == SYNC_TIMEOUT_PING) {
if (atomic_load_64(&ths->pingTimerLogicClockUser) <= pMsg->logicClock) { if (atomic_load_64(&ths->pingTimerLogicClockUser) <= pMsg->logicClock) {
++(ths->pingTimerCounter); ++(ths->pingTimerCounter);
// syncNodePingAll(ths); // syncNodePingAll(ths);
syncNodePingPeers(ths); // syncNodePingPeers(ths);
syncNodeTimerRoutine(ths);
} }
} else if (pMsg->timeoutType == SYNC_TIMEOUT_ELECTION) { } else if (pMsg->timeoutType == SYNC_TIMEOUT_ELECTION) {
...@@ -40,7 +48,7 @@ int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { ...@@ -40,7 +48,7 @@ int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) {
syncNodeReplicate(ths); syncNodeReplicate(ths);
} }
} else { } else {
sTrace("unknown timeoutType:%d", pMsg->timeoutType); sError("vgId:%d, unknown timeout-type:%d", ths->vgId, pMsg->timeoutType);
} }
return ret; return ret;
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/deploy.sh -n dnode4 -i 4
system sh/cfg.sh -n dnode1 -c supportVnodes -v 0
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
system sh/exec.sh -n dnode4 -s start
$loop_cnt = 0
check_dnode_ready:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 10 then
print ====> dnode not ready!
return -1
endi
sql show dnodes
print ===> $rows $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
print ===> $rows $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
print ===> $rows $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6]
print ===> $rows $data[3][0] $data[3][1] $data[3][2] $data[3][3] $data[3][4] $data[3][5] $data[3][6]
if $data[0][0] != 1 then
return -1
endi
if $data[0][4] != ready then
goto check_dnode_ready
endi
sql connect
sql create dnode $hostname port 7200
sql create dnode $hostname port 7300
sql create dnode $hostname port 7400
$loop_cnt = 0
check_dnode_ready_1:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 10 then
print ====> dnodes not ready!
return -1
endi
sql show dnodes
print ===> $rows $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
print ===> $rows $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
print ===> $rows $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6]
print ===> $rows $data[3][0] $data[3][1] $data[3][2] $data[3][3] $data[3][4] $data[3][5] $data[3][6]
if $data[0][4] != ready then
goto check_dnode_ready_1
endi
if $data[1][4] != ready then
goto check_dnode_ready_1
endi
if $data[2][4] != ready then
goto check_dnode_ready_1
endi
if $data[3][4] != ready then
goto check_dnode_ready_1
endi
$replica = 3
$vgroups = 1
print ============= create database
sql create database db replica $replica vgroups $vgroups
$loop_cnt = 0
check_db_ready:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 100 then
print ====> db not ready!
return -1
endi
sql show databases
print ===> rows: $rows
print $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6] $data[2][7] $data[2][8] $data[2][9] $data[2][6] $data[2][11] $data[2][12] $data[2][13] $data[2][14] $data[2][15] $data[2][16] $data[2][17] $data[2][18] $data[2][19]
if $rows != 3 then
return -1
endi
if $data[2][19] != ready then
goto check_db_ready
endi
sql use db
$loop_cnt = 0
check_vg_ready:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 300 then
print ====> vgroups not ready!
return -1
endi
sql show vgroups
print ===> rows: $rows
print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] $data[0][7] $data[0][8] $data[0][9] $data[0][10] $data[0][11]
if $rows != $vgroups then
return -1
endi
if $data[0][4] == leader then
if $data[0][6] == follower then
if $data[0][8] == follower then
print ---- vgroup $data[0][0] leader locate on dnode $data[0][3]
endi
endi
elif $data[0][6] == leader then
if $data[0][4] == follower then
if $data[0][8] == follower then
print ---- vgroup $data[0][0] leader locate on dnode $data[0][5]
endi
endi
elif $data[0][8] == leader then
if $data[0][4] == follower then
if $data[0][6] == follower then
print ---- vgroup $data[0][0] leader locate on dnode $data[0][7]
endi
endi
else
goto check_vg_ready
endi
vg_ready:
print ====> create stable/child table
sql create table stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int)
sql show stables
if $rows != 1 then
return -1
endi
sql create table ct1 using stb tags(1000)
print ===> write 100 records
$N = 100
$count = 0
while $count < $N
$ms = 1591200000000 + $count
sql insert into ct1 values( $ms , $count , 2.1, 3.1)
$count = $count + 1
endw
#sql flush database db;
sleep 3000
print ===> stop dnode1 dnode2 dnode3 dnode4
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
system sh/exec.sh -n dnode4 -s stop -x SIGINT
########################################################
print ===> start dnode1 dnode2 dnode3 dnode4
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
system sh/exec.sh -n dnode4 -s start
sleep 3000
print =============== query data
sql connect
sql use db
sql select * from ct1
print rows: $rows
print $data00 $data01 $data02
if $rows != 100 then
return -1
endi
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
#system sh/exec.sh -n dnode2 -s stop -x SIGINT
#system sh/exec.sh -n dnode3 -s stop -x SIGINT
#system sh/exec.sh -n dnode4 -s stop -x SIGINT
#########################################################
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册