From 4af02b782808093c6094017fe851f47d058bec63 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 25 Mar 2021 16:06:25 +0800 Subject: [PATCH] [TD-3036]: syncdb replica; resync slave vnodes to keep consistent with master vnodes. --- src/dnode/src/dnodePeer.c | 1 + src/dnode/src/dnodeVMgmt.c | 9 +++++++++ src/dnode/src/dnodeVnodes.c | 5 +++-- src/inc/taosmsg.h | 3 ++- src/inc/vnode.h | 3 ++- src/mnode/inc/mnodeVgroup.h | 1 + src/mnode/src/mnodeDb.c | 38 ++++++++++++++++++++++++++++++++++++- src/mnode/src/mnodeVgroup.c | 38 +++++++++++++++++++++++++++++++++++++ src/sync/src/syncMain.c | 2 +- src/vnode/inc/vnodeMain.h | 3 ++- src/vnode/src/vnodeMain.c | 17 +++++++++++++++++ 11 files changed, 113 insertions(+), 7 deletions(-) diff --git a/src/dnode/src/dnodePeer.c b/src/dnode/src/dnodePeer.c index 79c60874f9..b8ce1c802b 100644 --- a/src/dnode/src/dnodePeer.c +++ b/src/dnode/src/dnodePeer.c @@ -43,6 +43,7 @@ int32_t dnodeInitServer() { dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeDispatchToVMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeDispatchToVMgmtQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = dnodeDispatchToVMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeDispatchToVMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeDispatchToVMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToVMgmtQueue; diff --git a/src/dnode/src/dnodeVMgmt.c b/src/dnode/src/dnodeVMgmt.c index 1e428fc8b1..66c94bf675 100644 --- a/src/dnode/src/dnodeVMgmt.c +++ b/src/dnode/src/dnodeVMgmt.c @@ -30,6 +30,7 @@ static taos_queue tsVMgmtQueue = NULL; static void * dnodeProcessMgmtQueue(void *param); static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg); +static int32_t dnodeProcessSyncVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg); static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg); @@ -39,6 +40,7 @@ static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg); int32_t dnodeInitVMgmt() { dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeProcessCreateVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeProcessAlterVnodeMsg; + dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = dnodeProcessSyncVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeProcessDropVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeProcessAlterStreamMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeMsg; @@ -179,6 +181,13 @@ static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) { } } +static int32_t dnodeProcessSyncVnodeMsg(SRpcMsg *rpcMsg) { + SSyncVnodeMsg *pSyncVnode = rpcMsg->pCont; + pSyncVnode->vgId = htonl(pSyncVnode->vgId); + + return vnodeSync(pSyncVnode->vgId); +} + static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) { SDropVnodeMsg *pDrop = rpcMsg->pCont; pDrop->vgId = htonl(pDrop->vgId); diff --git a/src/dnode/src/dnodeVnodes.c b/src/dnode/src/dnodeVnodes.c index d00314fcbc..d96251cebe 100644 --- a/src/dnode/src/dnodeVnodes.c +++ b/src/dnode/src/dnodeVnodes.c @@ -202,10 +202,11 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { char clusterId[TSDB_CLUSTER_ID_LEN]; dnodeGetClusterId(clusterId); if (clusterId[0] != '\0') { - dError("exit zombie dropped dnode"); - exit(EXIT_FAILURE); + dError("exit zombie dropped dnode"); + exit(EXIT_FAILURE); } } + taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); return; } diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index b9f45bd0d9..914738338b 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -59,6 +59,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_DROP_STABLE, "drop-stable" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_STREAM, "alter-stream" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CONFIG_DNODE, "config-dnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_VNODE, "alter-vnode" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_SYNC_VNODE, "sync-vnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CREATE_MNODE, "create-mnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY6, "dummy6" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY7, "dummy7" ) @@ -389,7 +390,7 @@ typedef struct { typedef struct { int32_t vgId; -} SDropVnodeMsg; +} SDropVnodeMsg, SSyncVnodeMsg; typedef struct SColIndex { int16_t colId; // column id diff --git a/src/inc/vnode.h b/src/inc/vnode.h index dddec83da8..39bd2f46c3 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -60,6 +60,7 @@ int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg); int32_t vnodeDrop(int32_t vgId); int32_t vnodeOpen(int32_t vgId); int32_t vnodeAlter(void *pVnode, SCreateVnodeMsg *pVnodeCfg); +int32_t vnodeSync(int32_t vgId); int32_t vnodeClose(int32_t vgId); // vnodeMgmt @@ -89,4 +90,4 @@ int32_t vnodeProcessRead(void *pVnode, SVReadMsg *pRead); } #endif -#endif \ No newline at end of file +#endif diff --git a/src/mnode/inc/mnodeVgroup.h b/src/mnode/inc/mnodeVgroup.h index e052cdb83c..7b798c23f8 100644 --- a/src/mnode/inc/mnodeVgroup.h +++ b/src/mnode/inc/mnodeVgroup.h @@ -49,6 +49,7 @@ void mnodeRemoveTableFromVgroup(SVgObj *pVgroup, SCTableObj *pTable); void mnodeSendDropVnodeMsg(int32_t vgId, SRpcEpSet *epSet, void *ahandle); void mnodeSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle); void mnodeSendAlterVgroupMsg(SVgObj *pVgroup); +void mnodeSendSyncVgroupMsg(SVgObj *pVgroup); SRpcEpSet mnodeGetEpSetFromVgroup(SVgObj *pVgroup); SRpcEpSet mnodeGetEpSetFromIp(char *ep); diff --git a/src/mnode/src/mnodeDb.c b/src/mnode/src/mnodeDb.c index 0a82b8f27c..909ca7cac6 100644 --- a/src/mnode/src/mnodeDb.c +++ b/src/mnode/src/mnodeDb.c @@ -1186,8 +1186,44 @@ static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg) { return mnodeDropDb(pMsg); } +static int32_t mnodeSyncDb(SDbObj *pDb, SMnodeMsg *pMsg) { + void *pIter = NULL; + SVgObj *pVgroup = NULL; + while (1) { + pIter = mnodeGetNextVgroup(pIter, &pVgroup); + if (pVgroup == NULL) break; + if (pVgroup->pDb == pDb) { + mnodeSendSyncVgroupMsg(pVgroup); + } + mnodeDecVgroupRef(pVgroup); + } + + mLInfo("db:%s, is synced by %s", pDb->name, mnodeGetUserFromMsg(pMsg)); + + return TSDB_CODE_SUCCESS; +} + static int32_t mnodeProcessSyncDbMsg(SMnodeMsg *pMsg) { - return 0; + SSyncDbMsg *pSyncDb = pMsg->rpcMsg.pCont; + mDebug("db:%s, syncdb is received from thandle:%p, ignore:%d", pSyncDb->db, pMsg->rpcMsg.handle, pSyncDb->ignoreNotExists); + + if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDb(pSyncDb->db); + if (pMsg->pDb == NULL) { + if (pSyncDb->ignoreNotExists) { + mDebug("db:%s, db is not exist, treat as success", pSyncDb->db); + return TSDB_CODE_SUCCESS; + } else { + mError("db:%s, failed to sync, invalid db", pSyncDb->db); + return TSDB_CODE_MND_INVALID_DB; + } + } + + if (pMsg->pDb->status != TSDB_DB_STATUS_READY) { + mError("db:%s, status:%d, in dropping", pSyncDb->db, pMsg->pDb->status); + return TSDB_CODE_MND_DB_IN_DROPPING; + } + + return mnodeSyncDb(pMsg->pDb, pMsg); } void mnodeDropAllDbs(SAcctObj *pAcct) { diff --git a/src/mnode/src/mnodeVgroup.c b/src/mnode/src/mnodeVgroup.c index fdbf7ae398..7eb3122d83 100644 --- a/src/mnode/src/mnodeVgroup.c +++ b/src/mnode/src/mnodeVgroup.c @@ -60,6 +60,7 @@ static int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn); static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg); static void mnodeProcessAlterVnodeRsp(SRpcMsg *rpcMsg); +static void mnodeProcessSyncVnodeRsp(SRpcMsg *rpcMsg); static void mnodeProcessDropVnodeRsp(SRpcMsg *rpcMsg); static int32_t mnodeProcessVnodeCfgMsg(SMnodeMsg *pMsg) ; static void mnodeSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle); @@ -236,6 +237,7 @@ int32_t mnodeInitVgroups() { mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_VGROUP, mnodeCancelGetNextVgroup); mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP, mnodeProcessCreateVnodeRsp); mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP, mnodeProcessAlterVnodeRsp); + mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP, mnodeProcessSyncVnodeRsp); mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_DROP_VNODE_RSP, mnodeProcessDropVnodeRsp); mnodeAddPeerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_VNODE, mnodeProcessVnodeCfgMsg); @@ -967,6 +969,38 @@ void mnodeSendAlterVgroupMsg(SVgObj *pVgroup) { } } +static SSyncVnodeMsg *mnodeBuildSyncVnodeMsg(int32_t vgId) { + SSyncVnodeMsg *pSyncVnode = rpcMallocCont(sizeof(SSyncVnodeMsg)); + if (pSyncVnode == NULL) return NULL; + + pSyncVnode->vgId = htonl(vgId); + return pSyncVnode; +} + +static void mnodeSendSyncVnodeMsg(SVgObj *pVgroup, SRpcEpSet *epSet) { + SSyncVnodeMsg *pSyncVnode = mnodeBuildSyncVnodeMsg(pVgroup->vgId); + SRpcMsg rpcMsg = { + .ahandle = NULL, + .pCont = pSyncVnode, + .contLen = pSyncVnode ? sizeof(SSyncVnodeMsg) : 0, + .code = 0, + .msgType = TSDB_MSG_TYPE_MD_SYNC_VNODE + }; + + dnodeSendMsgToDnode(epSet, &rpcMsg); +} + +void mnodeSendSyncVgroupMsg(SVgObj *pVgroup) { + mDebug("vgId:%d, send sync all vnodes msg, numOfVnodes:%d db:%s", pVgroup->vgId, pVgroup->numOfVnodes, + pVgroup->dbName); + for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { + SRpcEpSet epSet = mnodeGetEpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp); + mDebug("vgId:%d, index:%d, send sync vnode msg to dnode %s", pVgroup->vgId, i, + pVgroup->vnodeGid[i].pDnode->dnodeEp); + mnodeSendSyncVnodeMsg(pVgroup, &epSet); + } +} + static void mnodeSendCreateVnodeMsg(SVgObj *pVgroup, SRpcEpSet *epSet, void *ahandle) { SCreateVnodeMsg *pCreate = mnodeBuildVnodeMsg(pVgroup); SRpcMsg rpcMsg = { @@ -994,6 +1028,10 @@ static void mnodeProcessAlterVnodeRsp(SRpcMsg *rpcMsg) { mDebug("alter vnode rsp received"); } +static void mnodeProcessSyncVnodeRsp(SRpcMsg *rpcMsg) { + mDebug("sync vnode rsp received"); +} + static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) { if (rpcMsg->ahandle == NULL) return; diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index 264bbf6b92..76d4379c5f 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -410,7 +410,7 @@ void syncConfirmForward(int64_t rid, uint64_t version, int32_t code, bool force) syncReleaseNode(pNode); } -#if 0 +#if 1 void syncRecover(int64_t rid) { SSyncPeer *pPeer; diff --git a/src/vnode/inc/vnodeMain.h b/src/vnode/inc/vnodeMain.h index 73591bc10d..91a5d632cd 100644 --- a/src/vnode/inc/vnodeMain.h +++ b/src/vnode/inc/vnodeMain.h @@ -25,6 +25,7 @@ int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg); int32_t vnodeDrop(int32_t vgId); int32_t vnodeOpen(int32_t vgId); int32_t vnodeAlter(void *pVnode, SCreateVnodeMsg *pVnodeCfg); +int32_t vnodeSync(int32_t vgId); int32_t vnodeClose(int32_t vgId); void vnodeCleanUp(SVnodeObj *pVnode); void vnodeDestroy(SVnodeObj *pVnode); @@ -33,4 +34,4 @@ void vnodeDestroy(SVnodeObj *pVnode); } #endif -#endif \ No newline at end of file +#endif diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index ded39e67cc..441e951250 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -91,6 +91,23 @@ int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg) { return code; } +int32_t vnodeSync(int32_t vgId) { + SVnodeObj *pVnode = vnodeAcquire(vgId); + if (pVnode == NULL) { + vDebug("vgId:%d, failed to sync, vnode not find", vgId); + return TSDB_CODE_VND_INVALID_VGROUP_ID; + } + + if (pVnode->role != TAOS_SYNC_ROLE_MASTER) { + vInfo("vgId:%d, vnode will sync, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); + syncRecover(pVnode->sync); + } + + vnodeRelease(pVnode); + + return TSDB_CODE_SUCCESS; +} + int32_t vnodeDrop(int32_t vgId) { SVnodeObj *pVnode = vnodeAcquire(vgId); if (pVnode == NULL) { -- GitLab