提交 b43ffec5 编写于 作者: S Shengliang Guan

TD-1671

上级 2159c68e
......@@ -957,11 +957,11 @@ static void balanceMonitorDnodeModule() {
continue;
}
mLInfo("dnode:%d, numOfMnodes:%d expect:%d, add mnode in this dnode", pDnode->dnodeId, numOfMnodes, tsNumOfMnodes);
mnodeAddMnode(pDnode->dnodeId);
mLInfo("dnode:%d, numOfMnodes:%d expect:%d, create mnode in this dnode", pDnode->dnodeId, numOfMnodes, tsNumOfMnodes);
mnodeCreateMnode(pDnode->dnodeId, pDnode->dnodeEp, true);
numOfMnodes = mnodeGetMnodesNum();
if (numOfMnodes >= tsNumOfMnodes) return;
// Only create one mnode each time
return;
}
}
......
......@@ -74,14 +74,16 @@ static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg);
static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg);
int32_t dnodeInitMgmt() {
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeProcessCreateVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeProcessAlterVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeProcessAlterVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeProcessDropVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeProcessAlterStreamMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeProcessCreateMnodeMsg;
dnodeAddClientRspHandle(TSDB_MSG_TYPE_DM_STATUS_RSP, dnodeProcessStatusRsp);
dnodeReadDnodeCfg();
......@@ -451,10 +453,28 @@ static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg) {
}
static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) {
SMDCfgDnodeMsg *pCfg = (SMDCfgDnodeMsg *)pMsg->pCont;
SMDCfgDnodeMsg *pCfg = pMsg->pCont;
return taosCfgDynamicOptions(pCfg->config);
}
static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg) {
SMDCreateMnodeMsg *pCfg = pMsg->pCont;
if (pCfg->dnodeId != dnodeGetDnodeId()) {
dError("dnodeId:%d in create mnode msg is not equal with saved dnodeId:%d", pCfg->dnodeId, dnodeGetDnodeId());
return TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED;
}
if (strcmp(pCfg->dnodeEp, tsLocalEp) != 0) {
dError("dnodeEp:%s in create mnode msg is not equal with saved dnodeEp:%s", pCfg->dnodeEp, tsLocalEp);
return TSDB_CODE_MND_DNODE_EP_NOT_CONFIGURED;
}
dDebug("dnodeId:%d, create mnode msg is received", pCfg->dnodeId);
dnodeStartMnode();
return TSDB_CODE_SUCCESS;
}
void dnodeUpdateMnodeEpSetForPeer(SRpcEpSet *pEpSet) {
if (pEpSet->numOfEps <= 0) {
dError("mnode EP list for peer is changed, but content is invalid, discard it");
......@@ -466,9 +486,10 @@ void dnodeUpdateMnodeEpSetForPeer(SRpcEpSet *pEpSet) {
pEpSet->port[i] -= TSDB_PORT_DNODEDNODE;
dInfo("mnode index:%d %s:%u", i, pEpSet->fqdn[i], pEpSet->port[i]);
#if 0
if (!mnodeIsRunning()) {
if (strcmp(pEpSet->fqdn[i], tsLocalFqdn) == 0 && pEpSet->port[i] == tsServerPort) {
dInfo("mnode index:%d %s:%u should work as mnode", i, pEpSet->fqdn[i], pEpSet->port[i]);
dInfo("mnode index:%d %s:%u self should work as mnode", i, pEpSet->fqdn[i], pEpSet->port[i]);
bool find = false;
for (int i = 0; i < tsDMnodeInfos.nodeNum; ++i) {
if (tsDMnodeInfos.nodeInfos[i].nodeId == dnodeGetDnodeId()) {
......@@ -488,6 +509,7 @@ void dnodeUpdateMnodeEpSetForPeer(SRpcEpSet *pEpSet) {
dnodeStartMnode();
}
}
#endif
}
tsDMnodeEpSet = *pEpSet;
......
......@@ -48,6 +48,7 @@ int32_t dnodeInitServer() {
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeDispatchToMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeDispatchToMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeDispatchToMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = dnodeDispatchToMnodePeerQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = dnodeDispatchToMnodePeerQueue;
......@@ -170,8 +171,12 @@ void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) {
rpcSendRequest(tsDnodeClientRpc, epSet, rpcMsg);
}
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) {
void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) {
SRpcEpSet epSet = {0};
dnodeGetMnodeEpSetForPeer(&epSet);
rpcSendRecv(tsDnodeClientRpc, &epSet, rpcMsg, rpcRsp);
}
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet) {
rpcSendRecv(tsDnodeClientRpc, epSet, rpcMsg, rpcRsp);
}
\ No newline at end of file
......@@ -156,7 +156,7 @@ static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char
dDebug("user:%s, send auth msg to mnodes", user);
SRpcMsg rpcRsp = {0};
dnodeSendMsgToDnodeRecv(&rpcMsg, &rpcRsp);
dnodeSendMsgToMnodeRecv(&rpcMsg, &rpcRsp);
if (rpcRsp.code != 0) {
dError("user:%s, auth msg received from mnodes, error:%s", user, tstrerror(rpcRsp.code));
......@@ -189,7 +189,7 @@ void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t sid) {
rpcMsg.msgType = TSDB_MSG_TYPE_DM_CONFIG_TABLE;
SRpcMsg rpcRsp = {0};
dnodeSendMsgToDnodeRecv(&rpcMsg, &rpcRsp);
dnodeSendMsgToMnodeRecv(&rpcMsg, &rpcRsp);
terrno = rpcRsp.code;
if (rpcRsp.code != 0) {
......
......@@ -47,7 +47,8 @@ bool dnodeStartMnode();
void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg));
void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg);
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp);
void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp);
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet);
void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t sid);
void *dnodeAllocateVnodeWqueue(void *pVnode);
......
......@@ -139,6 +139,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_ALREADY_IN_DNODE, 0, 0x0339, "Vgroup alr
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_NOT_FREE, 0, 0x033A, "Dnode not avaliable")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_CLUSTER_ID, 0, 0x033B, "Cluster id not match")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_NOT_READY, 0, 0x033C, "Cluster not ready")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED, 0, 0x033D, "Dnode Id not configured")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_EP_NOT_CONFIGURED, 0, 0x033E, "Dnode Ep not configured")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ACCT_ALREADY_EXIST, 0, 0x0340, "Account already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ACCT, 0, 0x0341, "Invalid account")
......
......@@ -59,7 +59,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_DROP_STABLE, "drop-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_STREAM, "alter-stream" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CONFIG_DNODE, "config-dnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_VNODE, "alter-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY5, "dummy5" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CREATE_MNODE, "create-mnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY6, "dummy6" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY7, "dummy7" )
......@@ -719,6 +719,11 @@ typedef struct {
char ep[TSDB_EP_LEN]; // end point, hostname:port
} SCMCreateDnodeMsg, SCMDropDnodeMsg;
typedef struct {
int32_t dnodeId;
char dnodeEp[TSDB_EP_LEN]; // end point, hostname:port
} SMDCreateMnodeMsg;
typedef struct {
int32_t dnodeId;
int32_t vgId;
......
......@@ -31,7 +31,7 @@ typedef enum {
int32_t mnodeInitMnodes();
void mnodeCleanupMnodes();
int32_t mnodeAddMnode(int32_t dnodeId);
void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm);
int32_t mnodeDropMnode(int32_t dnodeId);
void mnodeDropMnodeLocal(int32_t dnodeId);
......
......@@ -147,7 +147,7 @@ static int32_t mnodeDnodeActionRestored() {
mnodeCreateDnode(tsLocalEp, NULL);
SDnodeObj *pDnode = mnodeGetDnodeByEp(tsLocalEp);
if (pDnode != NULL) {
mnodeAddMnode(pDnode->dnodeId);
mnodeCreateMnode(pDnode->dnodeId, pDnode->dnodeEp, false);
mnodeDecDnodeRef(pDnode);
}
}
......
......@@ -23,6 +23,8 @@
#include "tutil.h"
#include "tsocket.h"
#include "tdataformat.h"
#include "dnode.h"
#include "mnode.h"
#include "mnodeDef.h"
#include "mnodeInt.h"
#include "mnodeMnode.h"
......@@ -30,6 +32,7 @@
#include "mnodeSdb.h"
#include "mnodeShow.h"
#include "mnodeUser.h"
#include "mnodeVgroup.h"
static void * tsMnodeSdb = NULL;
static int32_t tsMnodeUpdateSize = 0;
......@@ -266,7 +269,46 @@ void mnodeGetMnodeInfos(void *mnodeInfos) {
mnodeMnodeUnLock();
}
int32_t mnodeAddMnode(int32_t dnodeId) {
static int32_t mnodeSendCreateMnodeMsg(int32_t dnodeId, char *dnodeEp) {
mDebug("dnode:%d, send create mnode msg to dnode %s", dnodeId, dnodeEp);
SMDCreateMnodeMsg *pCreate = rpcMallocCont(sizeof(SMDCreateMnodeMsg));
if (pCreate == NULL) {
return TSDB_CODE_MND_OUT_OF_MEMORY;
} else {
pCreate->dnodeId = dnodeId;
tstrncpy(pCreate->dnodeEp, dnodeEp, sizeof(pCreate->dnodeEp));
}
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pCreate;
rpcMsg.contLen = sizeof(SMDCreateMnodeMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_MD_CREATE_MNODE;
SRpcMsg rpcRsp = {0};
SRpcEpSet epSet = mnodeGetEpSetFromIp(pCreate->dnodeEp);
dnodeSendMsgToDnodeRecv(&rpcMsg, &rpcRsp, &epSet);
if (rpcRsp.code != TSDB_CODE_SUCCESS) {
mError("dnode:%d, failed to send create mnode msg, ep:%s reason:%s", dnodeId, dnodeEp, tstrerror(rpcRsp.code));
}
rpcFreeCont(rpcRsp.pCont);
return rpcRsp.code;
}
static int32_t mnodeCreateMnodeCb(SMnodeMsg *pMsg, int32_t code) {
if (code != TSDB_CODE_SUCCESS) {
mError("failed to create mnode, reason:%s", tstrerror(code));
} else {
mDebug("mnode is created");
mnodeUpdateMnodeEpSet();
}
return code;
}
void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm) {
SMnodeObj *pMnode = calloc(1, sizeof(SMnodeObj));
pMnode->mnodeId = dnodeId;
pMnode->createdTime = taosGetTimestampMs();
......@@ -275,16 +317,24 @@ int32_t mnodeAddMnode(int32_t dnodeId) {
.type = SDB_OPER_GLOBAL,
.table = tsMnodeSdb,
.pObj = pMnode,
.writeCb = mnodeCreateMnodeCb
};
int32_t code = sdbInsertRow(&oper);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
taosTFree(pMnode);
int32_t code = TSDB_CODE_SUCCESS;
if (needConfirm) {
code = mnodeSendCreateMnodeMsg(dnodeId, dnodeEp);
}
mnodeUpdateMnodeEpSet();
if (code != TSDB_CODE_SUCCESS) {
taosTFree(pMnode);
return;
}
return code;
code = sdbInsertRow(&oper);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("dnode:%d, failed to create mnode, ep:%s reason:%s", dnodeId, dnodeEp, tstrerror(code));
taosTFree(pMnode);
}
}
void mnodeDropMnodeLocal(int32_t dnodeId) {
......
......@@ -58,7 +58,7 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) {
rpcRsp->rsp = epSet;
rpcRsp->len = sizeof(SRpcEpSet);
mDebug("%p, msg:%s in mpeer queue, will be redireced, numOfEps:%d inUse:%d", pMsg->rpcMsg.ahandle,
mDebug("%p, msg:%s in mpeer queue will be redirected, numOfEps:%d inUse:%d", pMsg->rpcMsg.ahandle,
taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse);
for (int32_t i = 0; i < epSet->numOfEps; ++i) {
if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) {
......
......@@ -52,7 +52,7 @@ int32_t mnodeProcessRead(SMnodeMsg *pMsg) {
SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet));
mnodeGetMnodeEpSetForShell(epSet);
mDebug("%p, msg:%s in mread queue, will be redireced, numOfEps:%d inUse:%d", pMsg->rpcMsg.ahandle,
mDebug("%p, msg:%s in mread queue will be redirected, numOfEps:%d inUse:%d", pMsg->rpcMsg.ahandle,
taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse);
for (int32_t i = 0; i < epSet->numOfEps; ++i) {
if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) {
......
......@@ -54,7 +54,7 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) {
rpcRsp->rsp = epSet;
rpcRsp->len = sizeof(SRpcEpSet);
mDebug("app:%p:%p, msg:%s in write queue, will be redireced, numOfEps:%d inUse:%d", pMsg->rpcMsg.ahandle, pMsg,
mDebug("app:%p:%p, msg:%s in write queue, will be redirected, numOfEps:%d inUse:%d", pMsg->rpcMsg.ahandle, pMsg,
taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse);
for (int32_t i = 0; i < epSet->numOfEps; ++i) {
if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) {
......
......@@ -25,12 +25,12 @@ sql create dnode $hostname2
$x = 0
show2:
$x = $x + 1
sleep 2000
if $x == 10 then
sleep 4000
if $x == 5 then
return -1
endi
sql show mnodes
sql show mnodes -x show2
print dnode1 ==> $data2_1
print dnode2 ==> $data2_2
if $data2_1 != master then
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册