提交 f7656ec7 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

Merge branch 'patch/TD-1669' of https://github.com/taosdata/TDengine into patch/TD-1669

...@@ -957,11 +957,11 @@ static void balanceMonitorDnodeModule() { ...@@ -957,11 +957,11 @@ static void balanceMonitorDnodeModule() {
continue; continue;
} }
mLInfo("dnode:%d, numOfMnodes:%d expect:%d, add mnode in this dnode", pDnode->dnodeId, numOfMnodes, tsNumOfMnodes); mLInfo("dnode:%d, numOfMnodes:%d expect:%d, create mnode in this dnode", pDnode->dnodeId, numOfMnodes, tsNumOfMnodes);
mnodeAddMnode(pDnode->dnodeId); mnodeCreateMnode(pDnode->dnodeId, pDnode->dnodeEp, true);
numOfMnodes = mnodeGetMnodesNum(); // Only create one mnode each time
if (numOfMnodes >= tsNumOfMnodes) return; return;
} }
} }
......
...@@ -148,7 +148,7 @@ static void *dnodeProcessMnodePeerQueue(void *param) { ...@@ -148,7 +148,7 @@ static void *dnodeProcessMnodePeerQueue(void *param) {
while (1) { while (1) {
if (taosReadQitemFromQset(tsMPeerQset, &type, (void **)&pPeerMsg, &unUsed) == 0) { if (taosReadQitemFromQset(tsMPeerQset, &type, (void **)&pPeerMsg, &unUsed) == 0) {
dDebug("dnodeProcessMnodePeerQueue: got no message from qset, exiting..."); dDebug("qset:%p, mnode peer got no message from qset, exiting", tsMPeerQset);
break; break;
} }
......
...@@ -156,7 +156,7 @@ static void *dnodeProcessMnodeReadQueue(void *param) { ...@@ -156,7 +156,7 @@ static void *dnodeProcessMnodeReadQueue(void *param) {
while (1) { while (1) {
if (taosReadQitemFromQset(tsMReadQset, &type, (void **)&pReadMsg, &unUsed) == 0) { if (taosReadQitemFromQset(tsMReadQset, &type, (void **)&pReadMsg, &unUsed) == 0) {
dDebug("dnodeProcessMnodeReadQueue: got no message from qset, exiting..."); dDebug("qset:%p, mnode read got no message from qset, exiting", tsMReadQset);
break; break;
} }
......
...@@ -158,7 +158,7 @@ static void *dnodeProcessMnodeWriteQueue(void *param) { ...@@ -158,7 +158,7 @@ static void *dnodeProcessMnodeWriteQueue(void *param) {
while (1) { while (1) {
if (taosReadQitemFromQset(tsMWriteQset, &type, (void **)&pWrite, &unUsed) == 0) { if (taosReadQitemFromQset(tsMWriteQset, &type, (void **)&pWrite, &unUsed) == 0) {
dDebug("dnodeProcessMnodeWriteQueue: got no message from qset, exiting..."); dDebug("qset:%p, mnode write got no message from qset, exiting", tsMWriteQset);
break; break;
} }
......
...@@ -74,14 +74,16 @@ static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg); ...@@ -74,14 +74,16 @@ static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg); static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg);
static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg); static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg);
int32_t dnodeInitMgmt() { int32_t dnodeInitMgmt() {
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeProcessCreateVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeProcessCreateVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeProcessAlterVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeProcessAlterVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeProcessDropVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeProcessDropVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeProcessAlterStreamMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeProcessAlterStreamMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeProcessCreateMnodeMsg;
dnodeAddClientRspHandle(TSDB_MSG_TYPE_DM_STATUS_RSP, dnodeProcessStatusRsp); dnodeAddClientRspHandle(TSDB_MSG_TYPE_DM_STATUS_RSP, dnodeProcessStatusRsp);
dnodeReadDnodeCfg(); dnodeReadDnodeCfg();
...@@ -226,7 +228,7 @@ static void *dnodeProcessMgmtQueue(void *param) { ...@@ -226,7 +228,7 @@ static void *dnodeProcessMgmtQueue(void *param) {
while (1) { while (1) {
if (taosReadQitemFromQset(tsMgmtQset, &type, (void **) &pMsg, &handle) == 0) { if (taosReadQitemFromQset(tsMgmtQset, &type, (void **) &pMsg, &handle) == 0) {
dDebug("dnode mgmt got no message from qset, exit ..."); dDebug("qset:%p, dnode mgmt got no message from qset, exit", tsMgmtQset);
break; break;
} }
...@@ -451,10 +453,28 @@ static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg) { ...@@ -451,10 +453,28 @@ static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg) {
} }
static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) { static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) {
SMDCfgDnodeMsg *pCfg = (SMDCfgDnodeMsg *)pMsg->pCont; SMDCfgDnodeMsg *pCfg = pMsg->pCont;
return taosCfgDynamicOptions(pCfg->config); return taosCfgDynamicOptions(pCfg->config);
} }
static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg) {
SMDCreateMnodeMsg *pCfg = pMsg->pCont;
if (pCfg->dnodeId != dnodeGetDnodeId()) {
dError("dnodeId:%d in create mnode msg is not equal with saved dnodeId:%d", pCfg->dnodeId, dnodeGetDnodeId());
return TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED;
}
if (strcmp(pCfg->dnodeEp, tsLocalEp) != 0) {
dError("dnodeEp:%s in create mnode msg is not equal with saved dnodeEp:%s", pCfg->dnodeEp, tsLocalEp);
return TSDB_CODE_MND_DNODE_EP_NOT_CONFIGURED;
}
dDebug("dnodeId:%d, create mnode msg is received", pCfg->dnodeId);
dnodeStartMnode();
return TSDB_CODE_SUCCESS;
}
void dnodeUpdateMnodeEpSetForPeer(SRpcEpSet *pEpSet) { void dnodeUpdateMnodeEpSetForPeer(SRpcEpSet *pEpSet) {
if (pEpSet->numOfEps <= 0) { if (pEpSet->numOfEps <= 0) {
dError("mnode EP list for peer is changed, but content is invalid, discard it"); dError("mnode EP list for peer is changed, but content is invalid, discard it");
...@@ -466,9 +486,10 @@ void dnodeUpdateMnodeEpSetForPeer(SRpcEpSet *pEpSet) { ...@@ -466,9 +486,10 @@ void dnodeUpdateMnodeEpSetForPeer(SRpcEpSet *pEpSet) {
pEpSet->port[i] -= TSDB_PORT_DNODEDNODE; pEpSet->port[i] -= TSDB_PORT_DNODEDNODE;
dInfo("mnode index:%d %s:%u", i, pEpSet->fqdn[i], pEpSet->port[i]); dInfo("mnode index:%d %s:%u", i, pEpSet->fqdn[i], pEpSet->port[i]);
#if 0
if (!mnodeIsRunning()) { if (!mnodeIsRunning()) {
if (strcmp(pEpSet->fqdn[i], tsLocalFqdn) == 0 && pEpSet->port[i] == tsServerPort) { if (strcmp(pEpSet->fqdn[i], tsLocalFqdn) == 0 && pEpSet->port[i] == tsServerPort) {
dInfo("mnode index:%d %s:%u should work as mnode", i, pEpSet->fqdn[i], pEpSet->port[i]); dInfo("mnode index:%d %s:%u self should work as mnode", i, pEpSet->fqdn[i], pEpSet->port[i]);
bool find = false; bool find = false;
for (int i = 0; i < tsDMnodeInfos.nodeNum; ++i) { for (int i = 0; i < tsDMnodeInfos.nodeNum; ++i) {
if (tsDMnodeInfos.nodeInfos[i].nodeId == dnodeGetDnodeId()) { if (tsDMnodeInfos.nodeInfos[i].nodeId == dnodeGetDnodeId()) {
...@@ -488,6 +509,7 @@ void dnodeUpdateMnodeEpSetForPeer(SRpcEpSet *pEpSet) { ...@@ -488,6 +509,7 @@ void dnodeUpdateMnodeEpSetForPeer(SRpcEpSet *pEpSet) {
dnodeStartMnode(); dnodeStartMnode();
} }
} }
#endif
} }
tsDMnodeEpSet = *pEpSet; tsDMnodeEpSet = *pEpSet;
......
...@@ -48,6 +48,7 @@ int32_t dnodeInitServer() { ...@@ -48,6 +48,7 @@ int32_t dnodeInitServer() {
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeDispatchToMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeDispatchToMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeDispatchToMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeDispatchToMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeDispatchToMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = dnodeDispatchToMnodePeerQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = dnodeDispatchToMnodePeerQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = dnodeDispatchToMnodePeerQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = dnodeDispatchToMnodePeerQueue;
...@@ -170,8 +171,12 @@ void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) { ...@@ -170,8 +171,12 @@ void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) {
rpcSendRequest(tsDnodeClientRpc, epSet, rpcMsg); rpcSendRequest(tsDnodeClientRpc, epSet, rpcMsg);
} }
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) { void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) {
SRpcEpSet epSet = {0}; SRpcEpSet epSet = {0};
dnodeGetMnodeEpSetForPeer(&epSet); dnodeGetMnodeEpSetForPeer(&epSet);
rpcSendRecv(tsDnodeClientRpc, &epSet, rpcMsg, rpcRsp); rpcSendRecv(tsDnodeClientRpc, &epSet, rpcMsg, rpcRsp);
} }
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet) {
rpcSendRecv(tsDnodeClientRpc, epSet, rpcMsg, rpcRsp);
}
\ No newline at end of file
...@@ -156,7 +156,7 @@ static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char ...@@ -156,7 +156,7 @@ static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char
dDebug("user:%s, send auth msg to mnodes", user); dDebug("user:%s, send auth msg to mnodes", user);
SRpcMsg rpcRsp = {0}; SRpcMsg rpcRsp = {0};
dnodeSendMsgToDnodeRecv(&rpcMsg, &rpcRsp); dnodeSendMsgToMnodeRecv(&rpcMsg, &rpcRsp);
if (rpcRsp.code != 0) { if (rpcRsp.code != 0) {
dError("user:%s, auth msg received from mnodes, error:%s", user, tstrerror(rpcRsp.code)); dError("user:%s, auth msg received from mnodes, error:%s", user, tstrerror(rpcRsp.code));
...@@ -189,7 +189,7 @@ void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t sid) { ...@@ -189,7 +189,7 @@ void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t sid) {
rpcMsg.msgType = TSDB_MSG_TYPE_DM_CONFIG_TABLE; rpcMsg.msgType = TSDB_MSG_TYPE_DM_CONFIG_TABLE;
SRpcMsg rpcRsp = {0}; SRpcMsg rpcRsp = {0};
dnodeSendMsgToDnodeRecv(&rpcMsg, &rpcRsp); dnodeSendMsgToMnodeRecv(&rpcMsg, &rpcRsp);
terrno = rpcRsp.code; terrno = rpcRsp.code;
if (rpcRsp.code != 0) { if (rpcRsp.code != 0) {
......
...@@ -199,7 +199,7 @@ static void *dnodeProcessReadQueue(void *param) { ...@@ -199,7 +199,7 @@ static void *dnodeProcessReadQueue(void *param) {
while (1) { while (1) {
if (taosReadQitemFromQset(readQset, &type, (void **)&pReadMsg, &pVnode) == 0) { if (taosReadQitemFromQset(readQset, &type, (void **)&pReadMsg, &pVnode) == 0) {
dDebug("dnodeProcessReadQueee: got no message from qset, exiting..."); dDebug("qset:%p dnode read got no message from qset, exiting", readQset);
break; break;
} }
......
...@@ -222,7 +222,7 @@ static void *dnodeProcessWriteQueue(void *param) { ...@@ -222,7 +222,7 @@ static void *dnodeProcessWriteQueue(void *param) {
while (1) { while (1) {
numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, pWorker->qall, &pVnode); numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, pWorker->qall, &pVnode);
if (numOfMsgs == 0) { if (numOfMsgs == 0) {
dDebug("dnodeProcessWriteQueee: got no message from qset, exiting..."); dDebug("qset:%p, dnode write got no message from qset, exiting", pWorker->qset);
break; break;
} }
......
...@@ -47,7 +47,8 @@ bool dnodeStartMnode(); ...@@ -47,7 +47,8 @@ bool dnodeStartMnode();
void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)); void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg));
void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg); void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg);
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp); void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp);
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet);
void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t sid); void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t sid);
void *dnodeAllocateVnodeWqueue(void *pVnode); void *dnodeAllocateVnodeWqueue(void *pVnode);
......
...@@ -139,6 +139,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_ALREADY_IN_DNODE, 0, 0x0339, "Vgroup alr ...@@ -139,6 +139,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_ALREADY_IN_DNODE, 0, 0x0339, "Vgroup alr
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_NOT_FREE, 0, 0x033A, "Dnode not avaliable") TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_NOT_FREE, 0, 0x033A, "Dnode not avaliable")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_CLUSTER_ID, 0, 0x033B, "Cluster id not match") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_CLUSTER_ID, 0, 0x033B, "Cluster id not match")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_NOT_READY, 0, 0x033C, "Cluster not ready") TAOS_DEFINE_ERROR(TSDB_CODE_MND_NOT_READY, 0, 0x033C, "Cluster not ready")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED, 0, 0x033D, "Dnode Id not configured")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_EP_NOT_CONFIGURED, 0, 0x033E, "Dnode Ep not configured")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ACCT_ALREADY_EXIST, 0, 0x0340, "Account already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_ACCT_ALREADY_EXIST, 0, 0x0340, "Account already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ACCT, 0, 0x0341, "Invalid account") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ACCT, 0, 0x0341, "Invalid account")
......
...@@ -59,7 +59,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_DROP_STABLE, "drop-stable" ) ...@@ -59,7 +59,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_DROP_STABLE, "drop-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_STREAM, "alter-stream" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_STREAM, "alter-stream" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CONFIG_DNODE, "config-dnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CONFIG_DNODE, "config-dnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_VNODE, "alter-vnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_VNODE, "alter-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY5, "dummy5" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CREATE_MNODE, "create-mnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY6, "dummy6" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY6, "dummy6" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY7, "dummy7" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY7, "dummy7" )
...@@ -719,6 +719,11 @@ typedef struct { ...@@ -719,6 +719,11 @@ typedef struct {
char ep[TSDB_EP_LEN]; // end point, hostname:port char ep[TSDB_EP_LEN]; // end point, hostname:port
} SCMCreateDnodeMsg, SCMDropDnodeMsg; } SCMCreateDnodeMsg, SCMDropDnodeMsg;
typedef struct {
int32_t dnodeId;
char dnodeEp[TSDB_EP_LEN]; // end point, hostname:port
} SMDCreateMnodeMsg;
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
int32_t vgId; int32_t vgId;
......
...@@ -31,7 +31,7 @@ typedef enum { ...@@ -31,7 +31,7 @@ typedef enum {
int32_t mnodeInitMnodes(); int32_t mnodeInitMnodes();
void mnodeCleanupMnodes(); void mnodeCleanupMnodes();
int32_t mnodeAddMnode(int32_t dnodeId); void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm);
int32_t mnodeDropMnode(int32_t dnodeId); int32_t mnodeDropMnode(int32_t dnodeId);
void mnodeDropMnodeLocal(int32_t dnodeId); void mnodeDropMnodeLocal(int32_t dnodeId);
......
...@@ -147,7 +147,7 @@ static int32_t mnodeDnodeActionRestored() { ...@@ -147,7 +147,7 @@ static int32_t mnodeDnodeActionRestored() {
mnodeCreateDnode(tsLocalEp, NULL); mnodeCreateDnode(tsLocalEp, NULL);
SDnodeObj *pDnode = mnodeGetDnodeByEp(tsLocalEp); SDnodeObj *pDnode = mnodeGetDnodeByEp(tsLocalEp);
if (pDnode != NULL) { if (pDnode != NULL) {
mnodeAddMnode(pDnode->dnodeId); mnodeCreateMnode(pDnode->dnodeId, pDnode->dnodeEp, false);
mnodeDecDnodeRef(pDnode); mnodeDecDnodeRef(pDnode);
} }
} }
......
...@@ -23,6 +23,8 @@ ...@@ -23,6 +23,8 @@
#include "tutil.h" #include "tutil.h"
#include "tsocket.h" #include "tsocket.h"
#include "tdataformat.h" #include "tdataformat.h"
#include "dnode.h"
#include "mnode.h"
#include "mnodeDef.h" #include "mnodeDef.h"
#include "mnodeInt.h" #include "mnodeInt.h"
#include "mnodeMnode.h" #include "mnodeMnode.h"
...@@ -30,6 +32,7 @@ ...@@ -30,6 +32,7 @@
#include "mnodeSdb.h" #include "mnodeSdb.h"
#include "mnodeShow.h" #include "mnodeShow.h"
#include "mnodeUser.h" #include "mnodeUser.h"
#include "mnodeVgroup.h"
static void * tsMnodeSdb = NULL; static void * tsMnodeSdb = NULL;
static int32_t tsMnodeUpdateSize = 0; static int32_t tsMnodeUpdateSize = 0;
...@@ -266,7 +269,46 @@ void mnodeGetMnodeInfos(void *mnodeInfos) { ...@@ -266,7 +269,46 @@ void mnodeGetMnodeInfos(void *mnodeInfos) {
mnodeMnodeUnLock(); mnodeMnodeUnLock();
} }
int32_t mnodeAddMnode(int32_t dnodeId) { static int32_t mnodeSendCreateMnodeMsg(int32_t dnodeId, char *dnodeEp) {
mDebug("dnode:%d, send create mnode msg to dnode %s", dnodeId, dnodeEp);
SMDCreateMnodeMsg *pCreate = rpcMallocCont(sizeof(SMDCreateMnodeMsg));
if (pCreate == NULL) {
return TSDB_CODE_MND_OUT_OF_MEMORY;
} else {
pCreate->dnodeId = dnodeId;
tstrncpy(pCreate->dnodeEp, dnodeEp, sizeof(pCreate->dnodeEp));
}
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pCreate;
rpcMsg.contLen = sizeof(SMDCreateMnodeMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_MD_CREATE_MNODE;
SRpcMsg rpcRsp = {0};
SRpcEpSet epSet = mnodeGetEpSetFromIp(pCreate->dnodeEp);
dnodeSendMsgToDnodeRecv(&rpcMsg, &rpcRsp, &epSet);
if (rpcRsp.code != TSDB_CODE_SUCCESS) {
mError("dnode:%d, failed to send create mnode msg, ep:%s reason:%s", dnodeId, dnodeEp, tstrerror(rpcRsp.code));
}
rpcFreeCont(rpcRsp.pCont);
return rpcRsp.code;
}
static int32_t mnodeCreateMnodeCb(SMnodeMsg *pMsg, int32_t code) {
if (code != TSDB_CODE_SUCCESS) {
mError("failed to create mnode, reason:%s", tstrerror(code));
} else {
mDebug("mnode is created");
mnodeUpdateMnodeEpSet();
}
return code;
}
void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm) {
SMnodeObj *pMnode = calloc(1, sizeof(SMnodeObj)); SMnodeObj *pMnode = calloc(1, sizeof(SMnodeObj));
pMnode->mnodeId = dnodeId; pMnode->mnodeId = dnodeId;
pMnode->createdTime = taosGetTimestampMs(); pMnode->createdTime = taosGetTimestampMs();
...@@ -275,16 +317,24 @@ int32_t mnodeAddMnode(int32_t dnodeId) { ...@@ -275,16 +317,24 @@ int32_t mnodeAddMnode(int32_t dnodeId) {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsMnodeSdb, .table = tsMnodeSdb,
.pObj = pMnode, .pObj = pMnode,
.writeCb = mnodeCreateMnodeCb
}; };
int32_t code = sdbInsertRow(&oper); int32_t code = TSDB_CODE_SUCCESS;
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (needConfirm) {
taosTFree(pMnode); code = mnodeSendCreateMnodeMsg(dnodeId, dnodeEp);
} }
mnodeUpdateMnodeEpSet(); if (code != TSDB_CODE_SUCCESS) {
taosTFree(pMnode);
return;
}
return code; code = sdbInsertRow(&oper);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("dnode:%d, failed to create mnode, ep:%s reason:%s", dnodeId, dnodeEp, tstrerror(code));
taosTFree(pMnode);
}
} }
void mnodeDropMnodeLocal(int32_t dnodeId) { void mnodeDropMnodeLocal(int32_t dnodeId) {
......
...@@ -58,10 +58,15 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) { ...@@ -58,10 +58,15 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) {
rpcRsp->rsp = epSet; rpcRsp->rsp = epSet;
rpcRsp->len = sizeof(SRpcEpSet); rpcRsp->len = sizeof(SRpcEpSet);
mDebug("%p, msg:%s in mpeer queue, will be redireced, numOfEps:%d inUse:%d", pMsg->rpcMsg.ahandle, mDebug("%p, msg:%s in mpeer queue will be redirected, numOfEps:%d inUse:%d", pMsg->rpcMsg.ahandle,
taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse); taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse);
for (int32_t i = 0; i < epSet->numOfEps; ++i) { for (int32_t i = 0; i < epSet->numOfEps; ++i) {
mDebug("mnode index:%d ep:%s:%d", i, epSet->fqdn[i], htons(epSet->port[i])); if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) {
epSet->inUse = (i + 1) % epSet->numOfEps;
mDebug("mnode index:%d ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse);
} else {
mDebug("mnode index:%d ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i]));
}
} }
return TSDB_CODE_RPC_REDIRECT; return TSDB_CODE_RPC_REDIRECT;
......
...@@ -51,14 +51,21 @@ int32_t mnodeProcessRead(SMnodeMsg *pMsg) { ...@@ -51,14 +51,21 @@ int32_t mnodeProcessRead(SMnodeMsg *pMsg) {
SMnodeRsp *rpcRsp = &pMsg->rpcRsp; SMnodeRsp *rpcRsp = &pMsg->rpcRsp;
SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet)); SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet));
mnodeGetMnodeEpSetForShell(epSet); mnodeGetMnodeEpSetForShell(epSet);
rpcRsp->rsp = epSet;
rpcRsp->len = sizeof(SRpcEpSet);
mDebug("%p, msg:%s in mread queue, will be redireced, inUse:%d", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], epSet->inUse); mDebug("%p, msg:%s in mread queue will be redirected, numOfEps:%d inUse:%d", pMsg->rpcMsg.ahandle,
taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse);
for (int32_t i = 0; i < epSet->numOfEps; ++i) { for (int32_t i = 0; i < epSet->numOfEps; ++i) {
mDebug("mnode index:%d ep:%s:%d", i, epSet->fqdn[i], htons(epSet->port[i])); if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) {
epSet->inUse = (i + 1) % epSet->numOfEps;
mDebug("mnode index:%d ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse);
} else {
mDebug("mnode index:%d ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i]));
}
} }
rpcRsp->rsp = epSet;
rpcRsp->len = sizeof(SRpcEpSet);
return TSDB_CODE_RPC_REDIRECT; return TSDB_CODE_RPC_REDIRECT;
} }
......
...@@ -1038,7 +1038,7 @@ static void *sdbWorkerFp(void *param) { ...@@ -1038,7 +1038,7 @@ static void *sdbWorkerFp(void *param) {
while (1) { while (1) {
numOfMsgs = taosReadAllQitemsFromQset(tsSdbWriteQset, tsSdbWriteQall, &unUsed); numOfMsgs = taosReadAllQitemsFromQset(tsSdbWriteQset, tsSdbWriteQall, &unUsed);
if (numOfMsgs == 0) { if (numOfMsgs == 0) {
sdbDebug("sdbWorkerFp: got no message from qset, exiting..."); sdbDebug("qset:%p, sdb got no message from qset, exiting", tsSdbWriteQset);
break; break;
} }
......
...@@ -54,11 +54,17 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) { ...@@ -54,11 +54,17 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) {
rpcRsp->rsp = epSet; rpcRsp->rsp = epSet;
rpcRsp->len = sizeof(SRpcEpSet); rpcRsp->len = sizeof(SRpcEpSet);
mDebug("app:%p:%p, msg:%s will be redireced inUse:%d", pMsg->rpcMsg.ahandle, pMsg, taosMsg[pMsg->rpcMsg.msgType], mDebug("app:%p:%p, msg:%s in write queue, will be redirected, numOfEps:%d inUse:%d", pMsg->rpcMsg.ahandle, pMsg,
epSet->inUse); taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse);
for (int32_t i = 0; i < epSet->numOfEps; ++i) { for (int32_t i = 0; i < epSet->numOfEps; ++i) {
mDebug("app:%p:%p, mnode index:%d ep:%s:%d", pMsg->rpcMsg.ahandle, pMsg, i, epSet->fqdn[i], if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) {
htons(epSet->port[i])); epSet->inUse = (i + 1) % epSet->numOfEps;
mDebug("app:%p:%p, mnode index:%d ep:%s:%d, set inUse to %d", pMsg->rpcMsg.ahandle, pMsg, i, epSet->fqdn[i],
htons(epSet->port[i]), epSet->inUse);
} else {
mDebug("app:%p:%p, mnode index:%d ep:%s:%d", pMsg->rpcMsg.ahandle, pMsg, i, epSet->fqdn[i],
htons(epSet->port[i]));
}
} }
return TSDB_CODE_RPC_REDIRECT; return TSDB_CODE_RPC_REDIRECT;
......
...@@ -67,7 +67,7 @@ static void *httpProcessResultQueue(void *param) { ...@@ -67,7 +67,7 @@ static void *httpProcessResultQueue(void *param) {
while (1) { while (1) {
if (taosReadQitemFromQset(tsHttpQset, &type, (void **)&pMsg, &unUsed) == 0) { if (taosReadQitemFromQset(tsHttpQset, &type, (void **)&pMsg, &unUsed) == 0) {
httpDebug("httpResultQueue: got no message from qset, exiting..."); httpDebug("qset:%p, http queue got no message from qset, exiting", tsHttpQset);
break; break;
} }
......
...@@ -1120,9 +1120,13 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte ...@@ -1120,9 +1120,13 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte
SRpcEpSet *pEpSet = (SRpcEpSet*)pHead->content; SRpcEpSet *pEpSet = (SRpcEpSet*)pHead->content;
if (pEpSet->numOfEps > 0) { if (pEpSet->numOfEps > 0) {
memcpy(&pContext->epSet, pHead->content, sizeof(pContext->epSet)); memcpy(&pContext->epSet, pHead->content, sizeof(pContext->epSet));
tDebug("%s, redirect is received, numOfEps:%d", pConn->info, pContext->epSet.numOfEps); tDebug("%s, redirect is received, numOfEps:%d inUse:%d", pConn->info, pContext->epSet.numOfEps,
for (int i=0; i<pContext->epSet.numOfEps; ++i) pContext->epSet.inUse);
for (int i = 0; i < pContext->epSet.numOfEps; ++i) {
pContext->epSet.port[i] = htons(pContext->epSet.port[i]); pContext->epSet.port[i] = htons(pContext->epSet.port[i]);
tDebug("%s, redirect is received, index:%d ep:%s:%u", pConn->info, i, pContext->epSet.fqdn[i],
pContext->epSet.port[i]);
}
} }
rpcSendReqToServer(pRpc, pContext); rpcSendReqToServer(pRpc, pContext);
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
......
...@@ -263,6 +263,7 @@ void taosCloseQset(taos_qset param) { ...@@ -263,6 +263,7 @@ void taosCloseQset(taos_qset param) {
// thread to exit. // thread to exit.
void taosQsetThreadResume(taos_qset param) { void taosQsetThreadResume(taos_qset param) {
STaosQset *qset = (STaosQset *)param; STaosQset *qset = (STaosQset *)param;
uDebug("qset:%p, it will exit", qset);
tsem_post(&qset->sem); tsem_post(&qset->sem);
} }
......
...@@ -111,24 +111,24 @@ echo "serverPort ${NODE}" >> $TAOS_CFG ...@@ -111,24 +111,24 @@ echo "serverPort ${NODE}" >> $TAOS_CFG
echo "dataDir $DATA_DIR" >> $TAOS_CFG echo "dataDir $DATA_DIR" >> $TAOS_CFG
echo "logDir $LOG_DIR" >> $TAOS_CFG echo "logDir $LOG_DIR" >> $TAOS_CFG
echo "debugFlag 0" >> $TAOS_CFG echo "debugFlag 0" >> $TAOS_CFG
echo "mDebugFlag 135" >> $TAOS_CFG echo "mDebugFlag 143" >> $TAOS_CFG
echo "sdbDebugFlag 135" >> $TAOS_CFG echo "sdbDebugFlag 143" >> $TAOS_CFG
echo "dDebugFlag 135" >> $TAOS_CFG echo "dDebugFlag 143" >> $TAOS_CFG
echo "vDebugFlag 135" >> $TAOS_CFG echo "vDebugFlag 143" >> $TAOS_CFG
echo "tsdbDebugFlag 135" >> $TAOS_CFG echo "tsdbDebugFlag 143" >> $TAOS_CFG
echo "cDebugFlag 135" >> $TAOS_CFG echo "cDebugFlag 143" >> $TAOS_CFG
echo "jnidebugFlag 135" >> $TAOS_CFG echo "jnidebugFlag 143" >> $TAOS_CFG
echo "odbcdebugFlag 135" >> $TAOS_CFG echo "odbcdebugFlag 143" >> $TAOS_CFG
echo "httpDebugFlag 135" >> $TAOS_CFG echo "httpDebugFlag 143" >> $TAOS_CFG
echo "monitorDebugFlag 135" >> $TAOS_CFG echo "monitorDebugFlag 143" >> $TAOS_CFG
echo "mqttDebugFlag 135" >> $TAOS_CFG echo "mqttDebugFlag 143" >> $TAOS_CFG
echo "qdebugFlag 135" >> $TAOS_CFG echo "qdebugFlag 143" >> $TAOS_CFG
echo "rpcDebugFlag 135" >> $TAOS_CFG echo "rpcDebugFlag 143" >> $TAOS_CFG
echo "tmrDebugFlag 131" >> $TAOS_CFG echo "tmrDebugFlag 131" >> $TAOS_CFG
echo "udebugFlag 135" >> $TAOS_CFG echo "udebugFlag 143" >> $TAOS_CFG
echo "sdebugFlag 135" >> $TAOS_CFG echo "sdebugFlag 143" >> $TAOS_CFG
echo "wdebugFlag 135" >> $TAOS_CFG echo "wdebugFlag 143" >> $TAOS_CFG
echo "cqdebugFlag 135" >> $TAOS_CFG echo "cqdebugFlag 143" >> $TAOS_CFG
echo "monitor 0" >> $TAOS_CFG echo "monitor 0" >> $TAOS_CFG
echo "monitorInterval 1" >> $TAOS_CFG echo "monitorInterval 1" >> $TAOS_CFG
echo "http 0" >> $TAOS_CFG echo "http 0" >> $TAOS_CFG
......
...@@ -109,15 +109,10 @@ echo "dataDir $DATA_DIR" >> $TAOS_CFG ...@@ -109,15 +109,10 @@ echo "dataDir $DATA_DIR" >> $TAOS_CFG
echo "logDir $LOG_DIR" >> $TAOS_CFG echo "logDir $LOG_DIR" >> $TAOS_CFG
echo "scriptDir ${CODE_DIR}/../script" >> $TAOS_CFG echo "scriptDir ${CODE_DIR}/../script" >> $TAOS_CFG
echo "numOfLogLines 100000000" >> $TAOS_CFG echo "numOfLogLines 100000000" >> $TAOS_CFG
echo "dDebugFlag 135" >> $TAOS_CFG echo "rpcDebugFlag 143" >> $TAOS_CFG
echo "mDebugFlag 135" >> $TAOS_CFG
echo "sdbDebugFlag 135" >> $TAOS_CFG
echo "rpcDebugFlag 135" >> $TAOS_CFG
echo "tmrDebugFlag 131" >> $TAOS_CFG echo "tmrDebugFlag 131" >> $TAOS_CFG
echo "cDebugFlag 135" >> $TAOS_CFG echo "cDebugFlag 143" >> $TAOS_CFG
echo "httpDebugFlag 135" >> $TAOS_CFG echo "udebugFlag 143" >> $TAOS_CFG
echo "monitorDebugFlag 135" >> $TAOS_CFG
echo "udebugFlag 135" >> $TAOS_CFG
echo "tablemetakeeptimer 5" >> $TAOS_CFG echo "tablemetakeeptimer 5" >> $TAOS_CFG
echo "wal 0" >> $TAOS_CFG echo "wal 0" >> $TAOS_CFG
echo "asyncLog 0" >> $TAOS_CFG echo "asyncLog 0" >> $TAOS_CFG
......
...@@ -25,12 +25,12 @@ sql create dnode $hostname2 ...@@ -25,12 +25,12 @@ sql create dnode $hostname2
$x = 0 $x = 0
show2: show2:
$x = $x + 1 $x = $x + 1
sleep 2000 sleep 4000
if $x == 10 then if $x == 5 then
return -1 return -1
endi endi
sql show mnodes sql show mnodes -x show2
print dnode1 ==> $data2_1 print dnode1 ==> $data2_1
print dnode2 ==> $data2_2 print dnode2 ==> $data2_2
if $data2_1 != master then if $data2_1 != master then
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册