From b43ffec58649b06c273b94b2b50210c6ed6f56d8 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sun, 11 Oct 2020 21:17:38 +0800 Subject: [PATCH] TD-1671 --- src/balance/src/balance.c | 8 ++-- src/dnode/src/dnodeMgmt.c | 28 +++++++++++-- src/dnode/src/dnodePeer.c | 7 +++- src/dnode/src/dnodeShell.c | 4 +- src/inc/dnode.h | 3 +- src/inc/taoserror.h | 2 + src/inc/taosmsg.h | 7 +++- src/mnode/inc/mnodeMnode.h | 2 +- src/mnode/src/mnodeDnode.c | 2 +- src/mnode/src/mnodeMnode.c | 62 +++++++++++++++++++++++++--- src/mnode/src/mnodePeer.c | 2 +- src/mnode/src/mnodeRead.c | 2 +- src/mnode/src/mnodeWrite.c | 2 +- tests/script/unique/mnode/mgmt21.sim | 6 +-- 14 files changed, 111 insertions(+), 26 deletions(-) diff --git a/src/balance/src/balance.c b/src/balance/src/balance.c index 3b9af741c3..0d4da965e2 100644 --- a/src/balance/src/balance.c +++ b/src/balance/src/balance.c @@ -957,11 +957,11 @@ static void balanceMonitorDnodeModule() { continue; } - mLInfo("dnode:%d, numOfMnodes:%d expect:%d, add mnode in this dnode", pDnode->dnodeId, numOfMnodes, tsNumOfMnodes); - mnodeAddMnode(pDnode->dnodeId); + mLInfo("dnode:%d, numOfMnodes:%d expect:%d, create mnode in this dnode", pDnode->dnodeId, numOfMnodes, tsNumOfMnodes); + mnodeCreateMnode(pDnode->dnodeId, pDnode->dnodeEp, true); - numOfMnodes = mnodeGetMnodesNum(); - if (numOfMnodes >= tsNumOfMnodes) return; + // Only create one mnode each time + return; } } diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 005a471b76..c05cd24c1d 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -74,14 +74,16 @@ static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg); static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg); +static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg); static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg); int32_t dnodeInitMgmt() { dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeProcessCreateVnodeMsg; - dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeProcessAlterVnodeMsg; + dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeProcessAlterVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeProcessDropVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeProcessAlterStreamMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeMsg; + dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeProcessCreateMnodeMsg; dnodeAddClientRspHandle(TSDB_MSG_TYPE_DM_STATUS_RSP, dnodeProcessStatusRsp); dnodeReadDnodeCfg(); @@ -451,10 +453,28 @@ static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg) { } static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) { - SMDCfgDnodeMsg *pCfg = (SMDCfgDnodeMsg *)pMsg->pCont; + SMDCfgDnodeMsg *pCfg = pMsg->pCont; return taosCfgDynamicOptions(pCfg->config); } +static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg) { + SMDCreateMnodeMsg *pCfg = pMsg->pCont; + if (pCfg->dnodeId != dnodeGetDnodeId()) { + dError("dnodeId:%d in create mnode msg is not equal with saved dnodeId:%d", pCfg->dnodeId, dnodeGetDnodeId()); + return TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED; + } + + if (strcmp(pCfg->dnodeEp, tsLocalEp) != 0) { + dError("dnodeEp:%s in create mnode msg is not equal with saved dnodeEp:%s", pCfg->dnodeEp, tsLocalEp); + return TSDB_CODE_MND_DNODE_EP_NOT_CONFIGURED; + } + + dDebug("dnodeId:%d, create mnode msg is received", pCfg->dnodeId); + dnodeStartMnode(); + + return TSDB_CODE_SUCCESS; +} + void dnodeUpdateMnodeEpSetForPeer(SRpcEpSet *pEpSet) { if (pEpSet->numOfEps <= 0) { dError("mnode EP list for peer is changed, but content is invalid, discard it"); @@ -466,9 +486,10 @@ void dnodeUpdateMnodeEpSetForPeer(SRpcEpSet *pEpSet) { pEpSet->port[i] -= TSDB_PORT_DNODEDNODE; dInfo("mnode index:%d %s:%u", i, pEpSet->fqdn[i], pEpSet->port[i]); +#if 0 if (!mnodeIsRunning()) { if (strcmp(pEpSet->fqdn[i], tsLocalFqdn) == 0 && pEpSet->port[i] == tsServerPort) { - dInfo("mnode index:%d %s:%u should work as mnode", i, pEpSet->fqdn[i], pEpSet->port[i]); + dInfo("mnode index:%d %s:%u self should work as mnode", i, pEpSet->fqdn[i], pEpSet->port[i]); bool find = false; for (int i = 0; i < tsDMnodeInfos.nodeNum; ++i) { if (tsDMnodeInfos.nodeInfos[i].nodeId == dnodeGetDnodeId()) { @@ -488,6 +509,7 @@ void dnodeUpdateMnodeEpSetForPeer(SRpcEpSet *pEpSet) { dnodeStartMnode(); } } +#endif } tsDMnodeEpSet = *pEpSet; diff --git a/src/dnode/src/dnodePeer.c b/src/dnode/src/dnodePeer.c index c09d742239..3bc2f7b48b 100644 --- a/src/dnode/src/dnodePeer.c +++ b/src/dnode/src/dnodePeer.c @@ -48,6 +48,7 @@ int32_t dnodeInitServer() { dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeDispatchToMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeDispatchToMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToMgmtQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeDispatchToMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = dnodeDispatchToMnodePeerQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = dnodeDispatchToMnodePeerQueue; @@ -170,8 +171,12 @@ void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) { rpcSendRequest(tsDnodeClientRpc, epSet, rpcMsg); } -void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) { +void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) { SRpcEpSet epSet = {0}; dnodeGetMnodeEpSetForPeer(&epSet); rpcSendRecv(tsDnodeClientRpc, &epSet, rpcMsg, rpcRsp); } + +void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet) { + rpcSendRecv(tsDnodeClientRpc, epSet, rpcMsg, rpcRsp); +} \ No newline at end of file diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index 4a1d337824..a5c5d4759b 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -156,7 +156,7 @@ static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char dDebug("user:%s, send auth msg to mnodes", user); SRpcMsg rpcRsp = {0}; - dnodeSendMsgToDnodeRecv(&rpcMsg, &rpcRsp); + dnodeSendMsgToMnodeRecv(&rpcMsg, &rpcRsp); if (rpcRsp.code != 0) { dError("user:%s, auth msg received from mnodes, error:%s", user, tstrerror(rpcRsp.code)); @@ -189,7 +189,7 @@ void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t sid) { rpcMsg.msgType = TSDB_MSG_TYPE_DM_CONFIG_TABLE; SRpcMsg rpcRsp = {0}; - dnodeSendMsgToDnodeRecv(&rpcMsg, &rpcRsp); + dnodeSendMsgToMnodeRecv(&rpcMsg, &rpcRsp); terrno = rpcRsp.code; if (rpcRsp.code != 0) { diff --git a/src/inc/dnode.h b/src/inc/dnode.h index 017241c4f8..1757bda812 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -47,7 +47,8 @@ bool dnodeStartMnode(); void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)); void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg); -void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp); +void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp); +void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet); void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t sid); void *dnodeAllocateVnodeWqueue(void *pVnode); diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index b5d22ea80c..786342b5a6 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -139,6 +139,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_ALREADY_IN_DNODE, 0, 0x0339, "Vgroup alr TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_NOT_FREE, 0, 0x033A, "Dnode not avaliable") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_CLUSTER_ID, 0, 0x033B, "Cluster id not match") TAOS_DEFINE_ERROR(TSDB_CODE_MND_NOT_READY, 0, 0x033C, "Cluster not ready") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED, 0, 0x033D, "Dnode Id not configured") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_EP_NOT_CONFIGURED, 0, 0x033E, "Dnode Ep not configured") TAOS_DEFINE_ERROR(TSDB_CODE_MND_ACCT_ALREADY_EXIST, 0, 0x0340, "Account already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ACCT, 0, 0x0341, "Invalid account") diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 50b31a86cc..c6e9b65abf 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -59,7 +59,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_DROP_STABLE, "drop-stable" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_STREAM, "alter-stream" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CONFIG_DNODE, "config-dnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_VNODE, "alter-vnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY5, "dummy5" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CREATE_MNODE, "create-mnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY6, "dummy6" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY7, "dummy7" ) @@ -719,6 +719,11 @@ typedef struct { char ep[TSDB_EP_LEN]; // end point, hostname:port } SCMCreateDnodeMsg, SCMDropDnodeMsg; +typedef struct { + int32_t dnodeId; + char dnodeEp[TSDB_EP_LEN]; // end point, hostname:port +} SMDCreateMnodeMsg; + typedef struct { int32_t dnodeId; int32_t vgId; diff --git a/src/mnode/inc/mnodeMnode.h b/src/mnode/inc/mnodeMnode.h index 0976ea8acd..a28a03ea40 100644 --- a/src/mnode/inc/mnodeMnode.h +++ b/src/mnode/inc/mnodeMnode.h @@ -31,7 +31,7 @@ typedef enum { int32_t mnodeInitMnodes(); void mnodeCleanupMnodes(); -int32_t mnodeAddMnode(int32_t dnodeId); +void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm); int32_t mnodeDropMnode(int32_t dnodeId); void mnodeDropMnodeLocal(int32_t dnodeId); diff --git a/src/mnode/src/mnodeDnode.c b/src/mnode/src/mnodeDnode.c index 040bc259cc..4c777e4eed 100644 --- a/src/mnode/src/mnodeDnode.c +++ b/src/mnode/src/mnodeDnode.c @@ -147,7 +147,7 @@ static int32_t mnodeDnodeActionRestored() { mnodeCreateDnode(tsLocalEp, NULL); SDnodeObj *pDnode = mnodeGetDnodeByEp(tsLocalEp); if (pDnode != NULL) { - mnodeAddMnode(pDnode->dnodeId); + mnodeCreateMnode(pDnode->dnodeId, pDnode->dnodeEp, false); mnodeDecDnodeRef(pDnode); } } diff --git a/src/mnode/src/mnodeMnode.c b/src/mnode/src/mnodeMnode.c index 8736e30217..93d5bc4f22 100644 --- a/src/mnode/src/mnodeMnode.c +++ b/src/mnode/src/mnodeMnode.c @@ -23,6 +23,8 @@ #include "tutil.h" #include "tsocket.h" #include "tdataformat.h" +#include "dnode.h" +#include "mnode.h" #include "mnodeDef.h" #include "mnodeInt.h" #include "mnodeMnode.h" @@ -30,6 +32,7 @@ #include "mnodeSdb.h" #include "mnodeShow.h" #include "mnodeUser.h" +#include "mnodeVgroup.h" static void * tsMnodeSdb = NULL; static int32_t tsMnodeUpdateSize = 0; @@ -266,7 +269,46 @@ void mnodeGetMnodeInfos(void *mnodeInfos) { mnodeMnodeUnLock(); } -int32_t mnodeAddMnode(int32_t dnodeId) { +static int32_t mnodeSendCreateMnodeMsg(int32_t dnodeId, char *dnodeEp) { + mDebug("dnode:%d, send create mnode msg to dnode %s", dnodeId, dnodeEp); + + SMDCreateMnodeMsg *pCreate = rpcMallocCont(sizeof(SMDCreateMnodeMsg)); + if (pCreate == NULL) { + return TSDB_CODE_MND_OUT_OF_MEMORY; + } else { + pCreate->dnodeId = dnodeId; + tstrncpy(pCreate->dnodeEp, dnodeEp, sizeof(pCreate->dnodeEp)); + } + + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = pCreate; + rpcMsg.contLen = sizeof(SMDCreateMnodeMsg); + rpcMsg.msgType = TSDB_MSG_TYPE_MD_CREATE_MNODE; + + SRpcMsg rpcRsp = {0}; + SRpcEpSet epSet = mnodeGetEpSetFromIp(pCreate->dnodeEp); + dnodeSendMsgToDnodeRecv(&rpcMsg, &rpcRsp, &epSet); + + if (rpcRsp.code != TSDB_CODE_SUCCESS) { + mError("dnode:%d, failed to send create mnode msg, ep:%s reason:%s", dnodeId, dnodeEp, tstrerror(rpcRsp.code)); + } + + rpcFreeCont(rpcRsp.pCont); + return rpcRsp.code; +} + +static int32_t mnodeCreateMnodeCb(SMnodeMsg *pMsg, int32_t code) { + if (code != TSDB_CODE_SUCCESS) { + mError("failed to create mnode, reason:%s", tstrerror(code)); + } else { + mDebug("mnode is created"); + mnodeUpdateMnodeEpSet(); + } + + return code; +} + +void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm) { SMnodeObj *pMnode = calloc(1, sizeof(SMnodeObj)); pMnode->mnodeId = dnodeId; pMnode->createdTime = taosGetTimestampMs(); @@ -275,16 +317,24 @@ int32_t mnodeAddMnode(int32_t dnodeId) { .type = SDB_OPER_GLOBAL, .table = tsMnodeSdb, .pObj = pMnode, + .writeCb = mnodeCreateMnodeCb }; - int32_t code = sdbInsertRow(&oper); - if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { - taosTFree(pMnode); + int32_t code = TSDB_CODE_SUCCESS; + if (needConfirm) { + code = mnodeSendCreateMnodeMsg(dnodeId, dnodeEp); } - mnodeUpdateMnodeEpSet(); + if (code != TSDB_CODE_SUCCESS) { + taosTFree(pMnode); + return; + } - return code; + code = sdbInsertRow(&oper); + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + mError("dnode:%d, failed to create mnode, ep:%s reason:%s", dnodeId, dnodeEp, tstrerror(code)); + taosTFree(pMnode); + } } void mnodeDropMnodeLocal(int32_t dnodeId) { diff --git a/src/mnode/src/mnodePeer.c b/src/mnode/src/mnodePeer.c index 06eec12ccb..885029605a 100644 --- a/src/mnode/src/mnodePeer.c +++ b/src/mnode/src/mnodePeer.c @@ -58,7 +58,7 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) { rpcRsp->rsp = epSet; rpcRsp->len = sizeof(SRpcEpSet); - mDebug("%p, msg:%s in mpeer queue, will be redireced, numOfEps:%d inUse:%d", pMsg->rpcMsg.ahandle, + mDebug("%p, msg:%s in mpeer queue will be redirected, numOfEps:%d inUse:%d", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse); for (int32_t i = 0; i < epSet->numOfEps; ++i) { if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) { diff --git a/src/mnode/src/mnodeRead.c b/src/mnode/src/mnodeRead.c index dab19a2993..93b944febb 100644 --- a/src/mnode/src/mnodeRead.c +++ b/src/mnode/src/mnodeRead.c @@ -52,7 +52,7 @@ int32_t mnodeProcessRead(SMnodeMsg *pMsg) { SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet)); mnodeGetMnodeEpSetForShell(epSet); - mDebug("%p, msg:%s in mread queue, will be redireced, numOfEps:%d inUse:%d", pMsg->rpcMsg.ahandle, + mDebug("%p, msg:%s in mread queue will be redirected, numOfEps:%d inUse:%d", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse); for (int32_t i = 0; i < epSet->numOfEps; ++i) { if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) { diff --git a/src/mnode/src/mnodeWrite.c b/src/mnode/src/mnodeWrite.c index 315f1585b2..d021745d2b 100644 --- a/src/mnode/src/mnodeWrite.c +++ b/src/mnode/src/mnodeWrite.c @@ -54,7 +54,7 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) { rpcRsp->rsp = epSet; rpcRsp->len = sizeof(SRpcEpSet); - mDebug("app:%p:%p, msg:%s in write queue, will be redireced, numOfEps:%d inUse:%d", pMsg->rpcMsg.ahandle, pMsg, + mDebug("app:%p:%p, msg:%s in write queue, will be redirected, numOfEps:%d inUse:%d", pMsg->rpcMsg.ahandle, pMsg, taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse); for (int32_t i = 0; i < epSet->numOfEps; ++i) { if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) { diff --git a/tests/script/unique/mnode/mgmt21.sim b/tests/script/unique/mnode/mgmt21.sim index 53ad0eebe7..a481f907e2 100644 --- a/tests/script/unique/mnode/mgmt21.sim +++ b/tests/script/unique/mnode/mgmt21.sim @@ -25,12 +25,12 @@ sql create dnode $hostname2 $x = 0 show2: $x = $x + 1 - sleep 2000 - if $x == 10 then + sleep 4000 + if $x == 5 then return -1 endi -sql show mnodes +sql show mnodes -x show2 print dnode1 ==> $data2_1 print dnode2 ==> $data2_2 if $data2_1 != master then -- GitLab