提交 4af02b78 编写于 作者: M Minglei Jin

[TD-3036]<feature>: syncdb <db_name> replica;

resync slave vnodes to keep consistent with master vnodes.
上级 85127267
......@@ -43,6 +43,7 @@ int32_t dnodeInitServer() {
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToVMgmtQueue;
......
......@@ -30,6 +30,7 @@ static taos_queue tsVMgmtQueue = NULL;
static void * dnodeProcessMgmtQueue(void *param);
static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessSyncVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg);
......@@ -39,6 +40,7 @@ static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg);
int32_t dnodeInitVMgmt() {
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeProcessCreateVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeProcessAlterVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = dnodeProcessSyncVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeProcessDropVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeProcessAlterStreamMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeMsg;
......@@ -179,6 +181,13 @@ static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) {
}
}
static int32_t dnodeProcessSyncVnodeMsg(SRpcMsg *rpcMsg) {
SSyncVnodeMsg *pSyncVnode = rpcMsg->pCont;
pSyncVnode->vgId = htonl(pSyncVnode->vgId);
return vnodeSync(pSyncVnode->vgId);
}
static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) {
SDropVnodeMsg *pDrop = rpcMsg->pCont;
pDrop->vgId = htonl(pDrop->vgId);
......
......@@ -202,10 +202,11 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
char clusterId[TSDB_CLUSTER_ID_LEN];
dnodeGetClusterId(clusterId);
if (clusterId[0] != '\0') {
dError("exit zombie dropped dnode");
exit(EXIT_FAILURE);
dError("exit zombie dropped dnode");
exit(EXIT_FAILURE);
}
}
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
return;
}
......
......@@ -59,6 +59,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_DROP_STABLE, "drop-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_STREAM, "alter-stream" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CONFIG_DNODE, "config-dnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_VNODE, "alter-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_SYNC_VNODE, "sync-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CREATE_MNODE, "create-mnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY6, "dummy6" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY7, "dummy7" )
......@@ -389,7 +390,7 @@ typedef struct {
typedef struct {
int32_t vgId;
} SDropVnodeMsg;
} SDropVnodeMsg, SSyncVnodeMsg;
typedef struct SColIndex {
int16_t colId; // column id
......
......@@ -60,6 +60,7 @@ int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg);
int32_t vnodeDrop(int32_t vgId);
int32_t vnodeOpen(int32_t vgId);
int32_t vnodeAlter(void *pVnode, SCreateVnodeMsg *pVnodeCfg);
int32_t vnodeSync(int32_t vgId);
int32_t vnodeClose(int32_t vgId);
// vnodeMgmt
......@@ -89,4 +90,4 @@ int32_t vnodeProcessRead(void *pVnode, SVReadMsg *pRead);
}
#endif
#endif
\ No newline at end of file
#endif
......@@ -49,6 +49,7 @@ void mnodeRemoveTableFromVgroup(SVgObj *pVgroup, SCTableObj *pTable);
void mnodeSendDropVnodeMsg(int32_t vgId, SRpcEpSet *epSet, void *ahandle);
void mnodeSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle);
void mnodeSendAlterVgroupMsg(SVgObj *pVgroup);
void mnodeSendSyncVgroupMsg(SVgObj *pVgroup);
SRpcEpSet mnodeGetEpSetFromVgroup(SVgObj *pVgroup);
SRpcEpSet mnodeGetEpSetFromIp(char *ep);
......
......@@ -1186,8 +1186,44 @@ static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg) {
return mnodeDropDb(pMsg);
}
static int32_t mnodeSyncDb(SDbObj *pDb, SMnodeMsg *pMsg) {
void *pIter = NULL;
SVgObj *pVgroup = NULL;
while (1) {
pIter = mnodeGetNextVgroup(pIter, &pVgroup);
if (pVgroup == NULL) break;
if (pVgroup->pDb == pDb) {
mnodeSendSyncVgroupMsg(pVgroup);
}
mnodeDecVgroupRef(pVgroup);
}
mLInfo("db:%s, is synced by %s", pDb->name, mnodeGetUserFromMsg(pMsg));
return TSDB_CODE_SUCCESS;
}
static int32_t mnodeProcessSyncDbMsg(SMnodeMsg *pMsg) {
return 0;
SSyncDbMsg *pSyncDb = pMsg->rpcMsg.pCont;
mDebug("db:%s, syncdb is received from thandle:%p, ignore:%d", pSyncDb->db, pMsg->rpcMsg.handle, pSyncDb->ignoreNotExists);
if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDb(pSyncDb->db);
if (pMsg->pDb == NULL) {
if (pSyncDb->ignoreNotExists) {
mDebug("db:%s, db is not exist, treat as success", pSyncDb->db);
return TSDB_CODE_SUCCESS;
} else {
mError("db:%s, failed to sync, invalid db", pSyncDb->db);
return TSDB_CODE_MND_INVALID_DB;
}
}
if (pMsg->pDb->status != TSDB_DB_STATUS_READY) {
mError("db:%s, status:%d, in dropping", pSyncDb->db, pMsg->pDb->status);
return TSDB_CODE_MND_DB_IN_DROPPING;
}
return mnodeSyncDb(pMsg->pDb, pMsg);
}
void mnodeDropAllDbs(SAcctObj *pAcct) {
......
......@@ -60,6 +60,7 @@ static int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p
static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg);
static void mnodeProcessAlterVnodeRsp(SRpcMsg *rpcMsg);
static void mnodeProcessSyncVnodeRsp(SRpcMsg *rpcMsg);
static void mnodeProcessDropVnodeRsp(SRpcMsg *rpcMsg);
static int32_t mnodeProcessVnodeCfgMsg(SMnodeMsg *pMsg) ;
static void mnodeSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle);
......@@ -236,6 +237,7 @@ int32_t mnodeInitVgroups() {
mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_VGROUP, mnodeCancelGetNextVgroup);
mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP, mnodeProcessCreateVnodeRsp);
mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP, mnodeProcessAlterVnodeRsp);
mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP, mnodeProcessSyncVnodeRsp);
mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_DROP_VNODE_RSP, mnodeProcessDropVnodeRsp);
mnodeAddPeerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_VNODE, mnodeProcessVnodeCfgMsg);
......@@ -967,6 +969,38 @@ void mnodeSendAlterVgroupMsg(SVgObj *pVgroup) {
}
}
static SSyncVnodeMsg *mnodeBuildSyncVnodeMsg(int32_t vgId) {
SSyncVnodeMsg *pSyncVnode = rpcMallocCont(sizeof(SSyncVnodeMsg));
if (pSyncVnode == NULL) return NULL;
pSyncVnode->vgId = htonl(vgId);
return pSyncVnode;
}
static void mnodeSendSyncVnodeMsg(SVgObj *pVgroup, SRpcEpSet *epSet) {
SSyncVnodeMsg *pSyncVnode = mnodeBuildSyncVnodeMsg(pVgroup->vgId);
SRpcMsg rpcMsg = {
.ahandle = NULL,
.pCont = pSyncVnode,
.contLen = pSyncVnode ? sizeof(SSyncVnodeMsg) : 0,
.code = 0,
.msgType = TSDB_MSG_TYPE_MD_SYNC_VNODE
};
dnodeSendMsgToDnode(epSet, &rpcMsg);
}
void mnodeSendSyncVgroupMsg(SVgObj *pVgroup) {
mDebug("vgId:%d, send sync all vnodes msg, numOfVnodes:%d db:%s", pVgroup->vgId, pVgroup->numOfVnodes,
pVgroup->dbName);
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
SRpcEpSet epSet = mnodeGetEpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp);
mDebug("vgId:%d, index:%d, send sync vnode msg to dnode %s", pVgroup->vgId, i,
pVgroup->vnodeGid[i].pDnode->dnodeEp);
mnodeSendSyncVnodeMsg(pVgroup, &epSet);
}
}
static void mnodeSendCreateVnodeMsg(SVgObj *pVgroup, SRpcEpSet *epSet, void *ahandle) {
SCreateVnodeMsg *pCreate = mnodeBuildVnodeMsg(pVgroup);
SRpcMsg rpcMsg = {
......@@ -994,6 +1028,10 @@ static void mnodeProcessAlterVnodeRsp(SRpcMsg *rpcMsg) {
mDebug("alter vnode rsp received");
}
static void mnodeProcessSyncVnodeRsp(SRpcMsg *rpcMsg) {
mDebug("sync vnode rsp received");
}
static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
if (rpcMsg->ahandle == NULL) return;
......
......@@ -410,7 +410,7 @@ void syncConfirmForward(int64_t rid, uint64_t version, int32_t code, bool force)
syncReleaseNode(pNode);
}
#if 0
#if 1
void syncRecover(int64_t rid) {
SSyncPeer *pPeer;
......
......@@ -25,6 +25,7 @@ int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg);
int32_t vnodeDrop(int32_t vgId);
int32_t vnodeOpen(int32_t vgId);
int32_t vnodeAlter(void *pVnode, SCreateVnodeMsg *pVnodeCfg);
int32_t vnodeSync(int32_t vgId);
int32_t vnodeClose(int32_t vgId);
void vnodeCleanUp(SVnodeObj *pVnode);
void vnodeDestroy(SVnodeObj *pVnode);
......@@ -33,4 +34,4 @@ void vnodeDestroy(SVnodeObj *pVnode);
}
#endif
#endif
\ No newline at end of file
#endif
......@@ -91,6 +91,23 @@ int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg) {
return code;
}
int32_t vnodeSync(int32_t vgId) {
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
vDebug("vgId:%d, failed to sync, vnode not find", vgId);
return TSDB_CODE_VND_INVALID_VGROUP_ID;
}
if (pVnode->role != TAOS_SYNC_ROLE_MASTER) {
vInfo("vgId:%d, vnode will sync, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
syncRecover(pVnode->sync);
}
vnodeRelease(pVnode);
return TSDB_CODE_SUCCESS;
}
int32_t vnodeDrop(int32_t vgId) {
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册