未验证 提交 5a941284 编写于 作者: L Li Minghao 提交者: GitHub

Merge pull request #14965 from taosdata/feature/3.0_mhli

refactor(sync): modify elect/heartbeat timer
...@@ -61,7 +61,7 @@ typedef void (*ShowFreeIterFp)(SMnode *pMnode, void *pIter); ...@@ -61,7 +61,7 @@ typedef void (*ShowFreeIterFp)(SMnode *pMnode, void *pIter);
typedef struct SQWorker SQHandle; typedef struct SQWorker SQHandle;
typedef struct { typedef struct {
const char * name; const char *name;
MndInitFp initFp; MndInitFp initFp;
MndCleanupFp cleanupFp; MndCleanupFp cleanupFp;
} SMnodeStep; } SMnodeStep;
...@@ -70,7 +70,7 @@ typedef struct { ...@@ -70,7 +70,7 @@ typedef struct {
int64_t showId; int64_t showId;
ShowRetrieveFp retrieveFps[TSDB_MGMT_TABLE_MAX]; ShowRetrieveFp retrieveFps[TSDB_MGMT_TABLE_MAX];
ShowFreeIterFp freeIterFps[TSDB_MGMT_TABLE_MAX]; ShowFreeIterFp freeIterFps[TSDB_MGMT_TABLE_MAX];
SCacheObj * cache; SCacheObj *cache;
} SShowMgmt; } SShowMgmt;
typedef struct { typedef struct {
...@@ -90,6 +90,7 @@ typedef struct { ...@@ -90,6 +90,7 @@ typedef struct {
SReplica replica; SReplica replica;
int32_t errCode; int32_t errCode;
int32_t transId; int32_t transId;
int8_t leaderTransferFinish;
} SSyncMgmt; } SSyncMgmt;
typedef struct { typedef struct {
...@@ -107,14 +108,14 @@ typedef struct SMnode { ...@@ -107,14 +108,14 @@ typedef struct SMnode {
bool stopped; bool stopped;
bool restored; bool restored;
bool deploy; bool deploy;
char * path; char *path;
int64_t checkTime; int64_t checkTime;
SSdb * pSdb; SSdb *pSdb;
SArray * pSteps; SArray *pSteps;
SQHandle * pQuery; SQHandle *pQuery;
SHashObj * infosMeta; SHashObj *infosMeta;
SHashObj * perfsMeta; SHashObj *perfsMeta;
SWal * pWal; SWal *pWal;
SShowMgmt showMgmt; SShowMgmt showMgmt;
SProfileMgmt profileMgmt; SProfileMgmt profileMgmt;
STelemMgmt telemMgmt; STelemMgmt telemMgmt;
......
...@@ -368,7 +368,18 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { ...@@ -368,7 +368,18 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
void mndPreClose(SMnode *pMnode) { void mndPreClose(SMnode *pMnode) {
if (pMnode != NULL) { if (pMnode != NULL) {
atomic_store_8(&(pMnode->syncMgmt.leaderTransferFinish), 0);
syncLeaderTransfer(pMnode->syncMgmt.sync); syncLeaderTransfer(pMnode->syncMgmt.sync);
/*
mDebug("vgId:1, mnode start leader transfer");
// wait for leader transfer finish
while (!atomic_load_8(&(pMnode->syncMgmt.leaderTransferFinish))) {
taosMsleep(10);
mDebug("vgId:1, mnode waiting for leader transfer");
}
mDebug("vgId:1, mnode finish leader transfer");
*/
} }
} }
......
...@@ -154,6 +154,12 @@ int32_t mndSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int ...@@ -154,6 +154,12 @@ int32_t mndSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int
return sdbDoWrite(pMnode->pSdb, pWriter, pBuf, len); return sdbDoWrite(pMnode->pSdb, pWriter, pBuf, len);
} }
void mndLeaderTransfer(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
SMnode *pMnode = pFsm->data;
atomic_store_8(&(pMnode->syncMgmt.leaderTransferFinish), 1);
mDebug("vgId:1, mnd leader transfer finish");
}
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM)); SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
pFsm->data = pMnode; pFsm->data = pMnode;
...@@ -161,6 +167,7 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { ...@@ -161,6 +167,7 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
pFsm->FpPreCommitCb = NULL; pFsm->FpPreCommitCb = NULL;
pFsm->FpRollBackCb = NULL; pFsm->FpRollBackCb = NULL;
pFsm->FpRestoreFinishCb = mndRestoreFinish; pFsm->FpRestoreFinishCb = mndRestoreFinish;
pFsm->FpLeaderTransferCb = mndLeaderTransfer;
pFsm->FpReConfigCb = mndReConfig; pFsm->FpReConfigCb = mndReConfig;
pFsm->FpGetSnapshot = mndSyncGetSnapshot; pFsm->FpGetSnapshot = mndSyncGetSnapshot;
pFsm->FpGetSnapshotInfo = mndSyncGetSnapshotInfo; pFsm->FpGetSnapshotInfo = mndSyncGetSnapshotInfo;
......
...@@ -536,6 +536,10 @@ static int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void * ...@@ -536,6 +536,10 @@ static int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *
#endif #endif
} }
static void vnodeLeaderTransfer(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
SVnode *pVnode = pFsm->data;
}
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM)); SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
pFsm->data = pVnode; pFsm->data = pVnode;
...@@ -544,6 +548,7 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { ...@@ -544,6 +548,7 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
pFsm->FpRollBackCb = vnodeSyncRollBackMsg; pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshot; pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshot;
pFsm->FpRestoreFinishCb = NULL; pFsm->FpRestoreFinishCb = NULL;
pFsm->FpLeaderTransferCb = vnodeLeaderTransfer;
pFsm->FpReConfigCb = vnodeSyncReconfig; pFsm->FpReConfigCb = vnodeSyncReconfig;
pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead; pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;
pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead; pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead;
...@@ -579,8 +584,8 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { ...@@ -579,8 +584,8 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
} }
setPingTimerMS(pVnode->sync, 5000); setPingTimerMS(pVnode->sync, 5000);
setElectTimerMS(pVnode->sync, 500); setElectTimerMS(pVnode->sync, 1300);
setHeartbeatTimerMS(pVnode->sync, 100); setHeartbeatTimerMS(pVnode->sync, 900);
return 0; return 0;
} }
......
...@@ -253,6 +253,7 @@ bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pNewCfg); ...@@ -253,6 +253,7 @@ bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pNewCfg);
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode); int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader); int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);
// for debug -------------- // for debug --------------
void syncNodePrint(SSyncNode* pObj); void syncNodePrint(SSyncNode* pObj);
......
...@@ -477,6 +477,13 @@ static int32_t syncNodeDoMakeLogSame(SSyncNode* ths, SyncIndex FromIndex) { ...@@ -477,6 +477,13 @@ static int32_t syncNodeDoMakeLogSame(SSyncNode* ths, SyncIndex FromIndex) {
static int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry) { static int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry) {
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg); syncEntry2OriginalRpc(pEntry, &rpcMsg);
// leader transfer
if (pEntry->originalRpcType == TDMT_SYNC_LEADER_TRANSFER) {
int32_t code = syncDoLeaderTransfer(ths, &rpcMsg, pEntry);
ASSERT(code == 0);
}
if (ths->pFsm != NULL) { if (ths->pFsm != NULL) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) { if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
SFsmCbMeta cbMeta = {0}; SFsmCbMeta cbMeta = {0};
......
...@@ -2598,9 +2598,13 @@ const char* syncStr(ESyncState state) { ...@@ -2598,9 +2598,13 @@ const char* syncStr(ESyncState state) {
} }
} }
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) { int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg); SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg);
if (ths->state != TAOS_SYNC_STATE_FOLLOWER) {
syncNodeEventLog(ths, "I am not follower, can not do leader transfer");
return 0;
}
syncNodeEventLog(ths, "do leader transfer"); syncNodeEventLog(ths, "do leader transfer");
bool sameId = syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId)); bool sameId = syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId));
...@@ -2811,11 +2815,14 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, ...@@ -2811,11 +2815,14 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
ASSERT(code == 0); ASSERT(code == 0);
} }
#if 0
// execute in pre-commit
// leader transfer // leader transfer
if (pEntry->originalRpcType == TDMT_SYNC_LEADER_TRANSFER) { if (pEntry->originalRpcType == TDMT_SYNC_LEADER_TRANSFER) {
code = syncDoLeaderTransfer(ths, &rpcMsg, pEntry); code = syncDoLeaderTransfer(ths, &rpcMsg, pEntry);
ASSERT(code == 0); ASSERT(code == 0);
} }
#endif
// restore finish // restore finish
// if only snapshot, a noop entry will be append, so syncLogLastIndex is always ok // if only snapshot, a noop entry will be append, so syncLogLastIndex is always ok
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
sql connect
print =============== show dnodes
sql show mnodes;
if $rows != 1 then
return -1
endi
if $data00 != 1 then
return -1
endi
if $data02 != leader then
return -1
endi
print =============== create dnodes
sql create dnode $hostname port 7200
sql create dnode $hostname port 7300
sleep 3000
print =============== create mnode 2, 3
sql create mnode on dnode 2
sql create mnode on dnode 3
sleep 3000
print =============== create user
sql create user user1 PASS 'user1'
sql show users
if $rows != 2 then
return -1
endi
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/deploy.sh -n dnode4 -i 4
system sh/cfg.sh -n dnode1 -c supportVnodes -v 0
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
system sh/exec.sh -n dnode4 -s start
sql connect
sql create dnode $hostname port 7200
sql create dnode $hostname port 7300
sql create dnode $hostname port 7400
$x = 0
step1:
$x = $x + 1
sleep 1000
if $x == 10 then
print ====> dnode not ready!
return -1
endi
sql show dnodes
print ===> $data00 $data01 $data02 $data03 $data04 $data05
print ===> $data10 $data11 $data12 $data13 $data14 $data15
print ===> $data20 $data21 $data22 $data23 $data24 $data25
print ===> $data30 $data31 $data32 $data33 $data34 $data35
if $rows != 4 then
return -1
endi
if $data(1)[4] != ready then
goto step1
endi
if $data(2)[4] != ready then
goto step1
endi
if $data(3)[4] != ready then
goto step1
endi
if $data(4)[4] != ready then
goto step1
endi
$replica = 3
$vgroups = 1
print ============= create database
sql create database db replica $replica vgroups $vgroups
$loop_cnt = 0
check_db_ready:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 100 then
print ====> db not ready!
return -1
endi
sql show databases
print ===> rows: $rows
print $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6] $data[2][7] $data[2][8] $data[2][9] $data[2][6] $data[2][11] $data[2][12] $data[2][13] $data[2][14] $data[2][15] $data[2][16] $data[2][17] $data[2][18] $data[2][19]
if $rows != 3 then
return -1
endi
if $data[2][19] != ready then
goto check_db_ready
endi
sql use db
$loop_cnt = 0
check_vg_ready:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 300 then
print ====> vgroups not ready!
return -1
endi
sql show vgroups
print ===> rows: $rows
print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] $data[0][7] $data[0][8] $data[0][9] $data[0][10] $data[0][11]
if $rows != $vgroups then
return -1
endi
if $data[0][4] == leader then
if $data[0][6] == follower then
if $data[0][8] == follower then
print ---- vgroup $data[0][0] leader locate on dnode $data[0][3]
endi
endi
elif $data[0][6] == leader then
if $data[0][4] == follower then
if $data[0][8] == follower then
print ---- vgroup $data[0][0] leader locate on dnode $data[0][5]
endi
endi
elif $data[0][8] == leader then
if $data[0][4] == follower then
if $data[0][6] == follower then
print ---- vgroup $data[0][0] leader locate on dnode $data[0][7]
endi
endi
else
goto check_vg_ready
endi
vg_ready:
print ====> create stable/child table
sql create table stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int)
sql show stables
if $rows != 1 then
return -1
endi
sql create table ct1 using stb tags(1000)
print ====> step1 insert 1000 records
$N = 1000
$count = 0
while $count < $N
$ms = 1591200000000 + $count
sql insert into ct1 values( $ms , $count , 2.1, 3.1)
$count = $count + 1
endw
print ====> step2 sleep 20s, checking data
sleep 20000
print ====> step3 sleep 30s, kill leader
sleep 30000
print ====> step4 insert 1000 records
$N = 1000
$count = 0
while $count < $N
$ms = 1591201000000 + $count
sql insert into ct1 values( $ms , $count , 2.1, 3.1)
$count = $count + 1
endw
print ====> step5 sleep 20s, checking data
sleep 20000
print ====> step6 stop all
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
system sh/exec.sh -n dnode4 -s stop -x SIGINT
print ====> step7 start all
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
system sh/exec.sh -n dnode4 -s start
print ====> step8 sleep 20s, checking data
sleep 20000
print ====> step9 insert 1000 records
$N = 1000
$count = 0
while $count < $N
$ms = 1591202000000 + $count
sql insert into ct1 values( $ms , $count , 2.1, 3.1)
$count = $count + 1
endw
print ====> step10 sleep 20s, checking data
sleep 20000
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/deploy.sh -n dnode4 -i 4
system sh/cfg.sh -n dnode1 -c supportVnodes -v 0
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
system sh/exec.sh -n dnode4 -s start
sql connect
sql create dnode $hostname port 7200
sql create dnode $hostname port 7300
sql create dnode $hostname port 7400
$x = 0
step1:
$x = $x + 1
sleep 1000
if $x == 10 then
print ====> dnode not ready!
return -1
endi
sql show dnodes
print ===> $data00 $data01 $data02 $data03 $data04 $data05
print ===> $data10 $data11 $data12 $data13 $data14 $data15
print ===> $data20 $data21 $data22 $data23 $data24 $data25
print ===> $data30 $data31 $data32 $data33 $data34 $data35
if $rows != 4 then
return -1
endi
if $data(1)[4] != ready then
goto step1
endi
if $data(2)[4] != ready then
goto step1
endi
if $data(3)[4] != ready then
goto step1
endi
if $data(4)[4] != ready then
goto step1
endi
$replica = 3
$vgroups = 6
print ============= create database
sql create database db replica $replica vgroups $vgroups
$loop_cnt = 0
check_db_ready:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 100 then
print ====> db not ready!
return -1
endi
sql show databases
print ===> rows: $rows
print $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6] $data[2][7] $data[2][8] $data[2][9] $data[2][6] $data[2][11] $data[2][12] $data[2][13] $data[2][14] $data[2][15] $data[2][16] $data[2][17] $data[2][18] $data[2][19]
if $rows != 3 then
return -1
endi
if $data[2][19] != ready then
goto check_db_ready
endi
sql use db
$loop_cnt = 0
check_vg_ready:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 300 then
print ====> vgroups not ready!
return -1
endi
sql show vgroups
print ===> rows: $rows
print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] $data[0][7] $data[0][8] $data[0][9] $data[0][10] $data[0][11]
if $rows != $vgroups then
return -1
endi
if $data[0][4] == leader then
if $data[0][6] == follower then
if $data[0][8] == follower then
print ---- vgroup $data[0][0] leader locate on dnode $data[0][3]
endi
endi
elif $data[0][6] == leader then
if $data[0][4] == follower then
if $data[0][8] == follower then
print ---- vgroup $data[0][0] leader locate on dnode $data[0][5]
endi
endi
elif $data[0][8] == leader then
if $data[0][4] == follower then
if $data[0][6] == follower then
print ---- vgroup $data[0][0] leader locate on dnode $data[0][7]
endi
endi
else
goto check_vg_ready
endi
vg_ready:
print ====> create stable/child table
sql create table stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int)
sql show stables
if $rows != 1 then
return -1
endi
sql create table ct1 using stb tags(1000)
print ====> step1 insert 1000 records
$N = 1000
$count = 0
while $count < $N
$ms = 1591200000000 + $count
sql insert into ct1 values( $ms , $count , 2.1, 3.1)
$count = $count + 1
endw
print ====> step2 sleep 20s, checking data
sleep 20000
print ====> step3 sleep 30s, kill leader
sleep 30000
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册