提交 5b6c7bc3 编写于 作者: M Minghao Li

refactor(sync): add leader transfer callback

上级 e3eaddac
...@@ -50,8 +50,8 @@ extern "C" { ...@@ -50,8 +50,8 @@ extern "C" {
// clang-format on // clang-format on
#define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE) #define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
#define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE) #define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
#define SYSTABLE_SCH_COL_NAME_LEN ((TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE) #define SYSTABLE_SCH_COL_NAME_LEN ((TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
typedef int32_t (*MndMsgFp)(SRpcMsg *pMsg); typedef int32_t (*MndMsgFp)(SRpcMsg *pMsg);
typedef int32_t (*MndInitFp)(SMnode *pMnode); typedef int32_t (*MndInitFp)(SMnode *pMnode);
...@@ -61,7 +61,7 @@ typedef void (*ShowFreeIterFp)(SMnode *pMnode, void *pIter); ...@@ -61,7 +61,7 @@ typedef void (*ShowFreeIterFp)(SMnode *pMnode, void *pIter);
typedef struct SQWorker SQHandle; typedef struct SQWorker SQHandle;
typedef struct { typedef struct {
const char * name; const char *name;
MndInitFp initFp; MndInitFp initFp;
MndCleanupFp cleanupFp; MndCleanupFp cleanupFp;
} SMnodeStep; } SMnodeStep;
...@@ -70,7 +70,7 @@ typedef struct { ...@@ -70,7 +70,7 @@ typedef struct {
int64_t showId; int64_t showId;
ShowRetrieveFp retrieveFps[TSDB_MGMT_TABLE_MAX]; ShowRetrieveFp retrieveFps[TSDB_MGMT_TABLE_MAX];
ShowFreeIterFp freeIterFps[TSDB_MGMT_TABLE_MAX]; ShowFreeIterFp freeIterFps[TSDB_MGMT_TABLE_MAX];
SCacheObj * cache; SCacheObj *cache;
} SShowMgmt; } SShowMgmt;
typedef struct { typedef struct {
...@@ -84,12 +84,13 @@ typedef struct { ...@@ -84,12 +84,13 @@ typedef struct {
} STelemMgmt; } STelemMgmt;
typedef struct { typedef struct {
tsem_t syncSem; tsem_t syncSem;
int64_t sync; int64_t sync;
bool standby; bool standby;
SReplica replica; SReplica replica;
int32_t errCode; int32_t errCode;
int32_t transId; int32_t transId;
int8_t leaderTransferFinish;
} SSyncMgmt; } SSyncMgmt;
typedef struct { typedef struct {
...@@ -107,14 +108,14 @@ typedef struct SMnode { ...@@ -107,14 +108,14 @@ typedef struct SMnode {
bool stopped; bool stopped;
bool restored; bool restored;
bool deploy; bool deploy;
char * path; char *path;
int64_t checkTime; int64_t checkTime;
SSdb * pSdb; SSdb *pSdb;
SArray * pSteps; SArray *pSteps;
SQHandle * pQuery; SQHandle *pQuery;
SHashObj * infosMeta; SHashObj *infosMeta;
SHashObj * perfsMeta; SHashObj *perfsMeta;
SWal * pWal; SWal *pWal;
SShowMgmt showMgmt; SShowMgmt showMgmt;
SProfileMgmt profileMgmt; SProfileMgmt profileMgmt;
STelemMgmt telemMgmt; STelemMgmt telemMgmt;
......
...@@ -367,8 +367,17 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { ...@@ -367,8 +367,17 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
} }
void mndPreClose(SMnode *pMnode) { void mndPreClose(SMnode *pMnode) {
mDebug("vgId:1, mnode pre-close");
if (pMnode != NULL) { if (pMnode != NULL) {
atomic_store_8(&(pMnode->syncMgmt.leaderTransferFinish), 0);
syncLeaderTransfer(pMnode->syncMgmt.sync); syncLeaderTransfer(pMnode->syncMgmt.sync);
mDebug("vgId:1, wait for mnode leader transfer");
// wait for leader transfer finish
while (!atomic_load_8(&(pMnode->syncMgmt.leaderTransferFinish))) {
taosMsleep(10);
}
mDebug("vgId:1, mnode leader transfer finish");
} }
} }
......
...@@ -153,6 +153,12 @@ int32_t mndSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int ...@@ -153,6 +153,12 @@ int32_t mndSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int
return sdbDoWrite(pMnode->pSdb, pWriter, pBuf, len); return sdbDoWrite(pMnode->pSdb, pWriter, pBuf, len);
} }
void mndLeaderTransfer(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
SMnode *pMnode = pFsm->data;
atomic_store_8(&(pMnode->syncMgmt.leaderTransferFinish), 1);
mDebug("vgId:1, mnd leader transfer finish");
}
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM)); SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
pFsm->data = pMnode; pFsm->data = pMnode;
...@@ -160,6 +166,7 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { ...@@ -160,6 +166,7 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
pFsm->FpPreCommitCb = NULL; pFsm->FpPreCommitCb = NULL;
pFsm->FpRollBackCb = NULL; pFsm->FpRollBackCb = NULL;
pFsm->FpRestoreFinishCb = mndRestoreFinish; pFsm->FpRestoreFinishCb = mndRestoreFinish;
pFsm->FpLeaderTransferCb = mndLeaderTransfer;
pFsm->FpReConfigCb = mndReConfig; pFsm->FpReConfigCb = mndReConfig;
pFsm->FpGetSnapshot = mndSyncGetSnapshot; pFsm->FpGetSnapshot = mndSyncGetSnapshot;
pFsm->FpGetSnapshotInfo = mndSyncGetSnapshotInfo; pFsm->FpGetSnapshotInfo = mndSyncGetSnapshotInfo;
......
...@@ -536,6 +536,10 @@ static int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void * ...@@ -536,6 +536,10 @@ static int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *
#endif #endif
} }
static void vnodeLeaderTransfer(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
SVnode *pVnode = pFsm->data;
}
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM)); SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
pFsm->data = pVnode; pFsm->data = pVnode;
...@@ -544,6 +548,7 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { ...@@ -544,6 +548,7 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
pFsm->FpRollBackCb = vnodeSyncRollBackMsg; pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshot; pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshot;
pFsm->FpRestoreFinishCb = NULL; pFsm->FpRestoreFinishCb = NULL;
pFsm->FpLeaderTransferCb = vnodeLeaderTransfer;
pFsm->FpReConfigCb = vnodeSyncReconfig; pFsm->FpReConfigCb = vnodeSyncReconfig;
pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead; pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;
pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead; pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead;
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
sql connect
print =============== show dnodes
sql show mnodes;
if $rows != 1 then
return -1
endi
if $data00 != 1 then
return -1
endi
if $data02 != leader then
return -1
endi
print =============== create dnodes
sql create dnode $hostname port 7200
sql create dnode $hostname port 7300
sleep 3000
print =============== create mnode 2
sql create mnode on dnode 2
sql create mnode on dnode 3
sleep 3000
print =============== create user
sql create user user1 PASS 'user1'
sql show users
if $rows != 2 then
return -1
endi
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册