From efc7d7a858ce6ef480c78db2840d7d0a5361b194 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 15 Jul 2020 11:29:29 +0000 Subject: [PATCH] [TD-931] add alter table msg --- src/dnode/src/dnodeMgmt.c | 31 +++++++++++++++++--- src/dnode/src/dnodePeer.c | 1 + src/inc/taosmsg.h | 3 +- src/mnode/inc/mnodeVgroup.h | 2 +- src/mnode/src/mnodeDb.c | 2 +- src/mnode/src/mnodeVgroup.c | 56 +++++++++++++++++++++++++++++++------ 6 files changed, 79 insertions(+), 16 deletions(-) diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 4577a5c31d..338fcd287c 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -72,6 +72,7 @@ static void *dnodeProcessMgmtQueue(void *param); static int32_t dnodeOpenVnodes(); static void dnodeCloseVnodes(); static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg); +static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg); static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg); @@ -79,6 +80,7 @@ static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg); int32_t dnodeInitMgmt() { dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeProcessCreateVnodeMsg; + dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeProcessAlterVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeProcessDropVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeProcessAlterStreamMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeMsg; @@ -388,7 +390,7 @@ static void dnodeCloseVnodes() { dInfo("total vnodes:%d are all closed", numOfVnodes); } -static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { +static void* dnodeParseVnodeMsg(SRpcMsg *rpcMsg) { SMDCreateVnodeMsg *pCreate = rpcMsg->pCont; pCreate->cfg.vgId = htonl(pCreate->cfg.vgId); pCreate->cfg.cfgVersion = htonl(pCreate->cfg.cfgVersion); @@ -408,17 +410,38 @@ static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { pCreate->nodes[j].nodeId = htonl(pCreate->nodes[j].nodeId); } + return pCreate; +} + +static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { + SMDCreateVnodeMsg *pCreate = dnodeParseVnodeMsg(rpcMsg); + void *pVnode = vnodeAcquireVnode(pCreate->cfg.vgId); if (pVnode != NULL) { - dDebug("vgId:%d, already exist, processed as alter msg", pCreate->cfg.vgId); - int32_t code = vnodeAlter(pVnode, pCreate); + dDebug("vgId:%d, already exist, return success", pCreate->cfg.vgId); vnodeRelease(pVnode); - return code; + return TSDB_CODE_SUCCESS; } else { + dDebug("vgId:%d, create vnode msg is received", pCreate->cfg.vgId); return vnodeCreate(pCreate); } } +static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) { + SMDAlterVnodeMsg *pAlter = dnodeParseVnodeMsg(rpcMsg); + + void *pVnode = vnodeAcquireVnode(pAlter->cfg.vgId); + if (pVnode != NULL) { + dDebug("vgId:%d, alter vnode msg is received", pAlter->cfg.vgId); + int32_t code = vnodeAlter(pVnode, pAlter); + vnodeRelease(pVnode); + return code; + } else { + dError("vgId:%d, vnode not exist, can't alter it", pAlter->cfg.vgId); + return TSDB_CODE_VND_INVALID_VGROUP_ID; + } +} + static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) { SMDDropVnodeMsg *pDrop = rpcMsg->pCont; pDrop->vgId = htonl(pDrop->vgId); diff --git a/src/dnode/src/dnodePeer.c b/src/dnode/src/dnodePeer.c index 18fdb4d090..2a3436583f 100644 --- a/src/dnode/src/dnodePeer.c +++ b/src/dnode/src/dnodePeer.c @@ -44,6 +44,7 @@ int32_t dnodeInitServer() { dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeDispatchToVnodeWriteQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeDispatchToMgmtQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeDispatchToMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeDispatchToMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeDispatchToMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToMgmtQueue; diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index dec3f60b64..e30efcfd37 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -54,6 +54,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CREATE_TABLE, "create-table" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_DROP_TABLE, "drop-table" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_TABLE, "alter-table" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CREATE_VNODE, "create-vnode" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_VNODE, "alter-vnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_DROP_VNODE, "drop-vnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_DROP_STABLE, "drop-stable" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_STREAM, "alter-stream" ) @@ -628,7 +629,7 @@ typedef struct { char db[TSDB_ACCT_LEN + TSDB_DB_NAME_LEN]; SMDVnodeCfg cfg; SMDVnodeDesc nodes[TSDB_MAX_REPLICA]; -} SMDCreateVnodeMsg; +} SMDCreateVnodeMsg, SMDAlterVnodeMsg; typedef struct { char tableId[TSDB_TABLE_ID_LEN]; diff --git a/src/mnode/inc/mnodeVgroup.h b/src/mnode/inc/mnodeVgroup.h index 6ddf8e44b9..3f1da89605 100644 --- a/src/mnode/inc/mnodeVgroup.h +++ b/src/mnode/inc/mnodeVgroup.h @@ -44,9 +44,9 @@ int32_t mnodeGetAvailableVgroup(struct SMnodeMsg *pMsg, SVgObj **pVgroup, int32_ void mnodeAddTableIntoVgroup(SVgObj *pVgroup, SChildTableObj *pTable); void mnodeRemoveTableFromVgroup(SVgObj *pVgroup, SChildTableObj *pTable); -void mnodeSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle); void mnodeSendDropVnodeMsg(int32_t vgId, SRpcIpSet *ipSet, void *ahandle); void mnodeSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle); +void mnodeSendAlterVgroupMsg(SVgObj *pVgroup); SRpcIpSet mnodeGetIpSetFromVgroup(SVgObj *pVgroup); SRpcIpSet mnodeGetIpSetFromIp(char *ep); diff --git a/src/mnode/src/mnodeDb.c b/src/mnode/src/mnodeDb.c index e3b1603856..fb97d6f380 100644 --- a/src/mnode/src/mnodeDb.c +++ b/src/mnode/src/mnodeDb.c @@ -913,7 +913,7 @@ static int32_t mnodeAlterDbCb(SMnodeMsg *pMsg, int32_t code) { pIter = mnodeGetNextVgroup(pIter, &pVgroup); if (pVgroup == NULL) break; if (pVgroup->pDb == pDb) { - mnodeSendCreateVgroupMsg(pVgroup, NULL); + mnodeSendAlterVgroupMsg(pVgroup); } mnodeDecVgroupRef(pVgroup); } diff --git a/src/mnode/src/mnodeVgroup.c b/src/mnode/src/mnodeVgroup.c index 05e300d242..ee75abc2f2 100644 --- a/src/mnode/src/mnodeVgroup.c +++ b/src/mnode/src/mnodeVgroup.c @@ -50,6 +50,7 @@ static int32_t mnodeAllocVgroupIdPool(SVgObj *pInputVgroup); static int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn); static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg); +static void mnodeProcessAlterVnodeRsp(SRpcMsg *rpcMsg); static void mnodeProcessDropVnodeRsp(SRpcMsg *rpcMsg); static int32_t mnodeProcessVnodeCfgMsg(SMnodeMsg *pMsg) ; static void mnodeSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle); @@ -217,6 +218,7 @@ int32_t mnodeInitVgroups() { mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_VGROUP, mnodeGetVgroupMeta); mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_VGROUP, mnodeRetrieveVgroups); mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP, mnodeProcessCreateVnodeRsp); + mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP, mnodeProcessAlterVnodeRsp); mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_DROP_VNODE_RSP, mnodeProcessDropVnodeRsp); mnodeAddPeerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_VNODE, mnodeProcessVnodeCfgMsg); @@ -247,7 +249,7 @@ void mnodeUpdateVgroup(SVgObj *pVgroup) { if (sdbUpdateRow(&oper) != TSDB_CODE_SUCCESS) { mError("vgId:%d, failed to update vgroup", pVgroup->vgId); } - mnodeSendCreateVgroupMsg(pVgroup, NULL); + mnodeSendAlterVgroupMsg(pVgroup); } /* @@ -272,9 +274,18 @@ void mnodeCheckUnCreatedVgroup(SDnodeObj *pDnode, SVnodeLoad *pVloads, int32_t o } if (i == openVnodes && pVgroup->status == TAOS_VG_STATUS_READY) { - mnodeSendCreateVgroupMsg(pVgroup, NULL); + SDbObj *pDb = pVgroup->pDb; + if (pDb->cfgVersion != 0) { + mDebug("vgId:%d, not exist in dnode:%d and cfgVersion is %d, send alter msg", pVgroup->vgId, pDnode->dnodeId, + pDb->cfgVersion); + mnodeSendAlterVgroupMsg(pVgroup); + } else { + mDebug("vgId:%d, not exist in dnode:%d and cfgVersion is %d, send create msg", pVgroup->vgId, pDnode->dnodeId, + pDb->cfgVersion); + mnodeSendCreateVgroupMsg(pVgroup, NULL); + } } - + mnodeDecVgroupRef(pVgroup); } @@ -314,7 +325,7 @@ void mnodeUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVl mError("dnode:%d, vgId:%d, vnode cfgVersion:%d repica:%d not match with mnode cfgVersion:%d replica:%d", pDnode->dnodeId, pVload->vgId, pVload->cfgVersion, pVload->replica, pVgroup->pDb->cfgVersion, pVgroup->numOfVnodes); - mnodeSendCreateVgroupMsg(pVgroup, NULL); + mnodeSendAlterVgroupMsg(pVgroup); } } @@ -536,7 +547,7 @@ void mnodeCleanupVgroups() { tsVgroupSdb = NULL; } -int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { +static int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { SDbObj *pDb = mnodeGetDb(pShow->db); if (pDb == NULL) { return TSDB_CODE_MND_DB_NOT_SELECTED; @@ -630,7 +641,7 @@ static bool mnodeFilterVgroups(SVgObj *pVgroup, STableObj *pTable) { } } -int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn) { +static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn) { int32_t numOfRows = 0; SVgObj *pVgroup = NULL; int32_t cols = 0; @@ -733,7 +744,7 @@ void mnodeRemoveTableFromVgroup(SVgObj *pVgroup, SChildTableObj *pTable) { } } -SMDCreateVnodeMsg *mnodeBuildCreateVnodeMsg(SVgObj *pVgroup) { +static SMDCreateVnodeMsg *mnodeBuildVnodeMsg(SVgObj *pVgroup) { SDbObj *pDb = pVgroup->pDb; if (pDb == NULL) return NULL; @@ -800,8 +811,31 @@ SRpcIpSet mnodeGetIpSetFromIp(char *ep) { return ipSet; } -void mnodeSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle) { - SMDCreateVnodeMsg *pCreate = mnodeBuildCreateVnodeMsg(pVgroup); +static void mnodeSendAlterVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet) { + SMDAlterVnodeMsg *pAlter = mnodeBuildVnodeMsg(pVgroup); + SRpcMsg rpcMsg = { + .ahandle = NULL, + .pCont = pAlter, + .contLen = pAlter ? sizeof(SMDAlterVnodeMsg) : 0, + .code = 0, + .msgType = TSDB_MSG_TYPE_MD_ALTER_VNODE + }; + dnodeSendMsgToDnode(ipSet, &rpcMsg); +} + +void mnodeSendAlterVgroupMsg(SVgObj *pVgroup) { + mDebug("vgId:%d, send alter all vnodes msg, numOfVnodes:%d db:%s", pVgroup->vgId, pVgroup->numOfVnodes, + pVgroup->dbName); + for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { + SRpcIpSet ipSet = mnodeGetIpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp); + mDebug("vgId:%d, index:%d, send alter vnode msg to dnode %s", pVgroup->vgId, i, + pVgroup->vnodeGid[i].pDnode->dnodeEp); + mnodeSendAlterVnodeMsg(pVgroup, &ipSet); + } +} + +static void mnodeSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle) { + SMDCreateVnodeMsg *pCreate = mnodeBuildVnodeMsg(pVgroup); SRpcMsg rpcMsg = { .ahandle = ahandle, .pCont = pCreate, @@ -823,6 +857,10 @@ void mnodeSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) { } } +static void mnodeProcessAlterVnodeRsp(SRpcMsg *rpcMsg) { + mDebug("alter vnode rsp received"); +} + static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) { if (rpcMsg->ahandle == NULL) return; -- GitLab