diff --git a/src/dnode/inc/dnodeMgmt.h b/src/dnode/inc/dnodeMgmt.h index 9530b6a4cba6fe1ad3eb7c271fa4957b5e1159d6..aae3bb31186d67d1bbafc36fd3336b68d37699e7 100644 --- a/src/dnode/inc/dnodeMgmt.h +++ b/src/dnode/inc/dnodeMgmt.h @@ -23,12 +23,17 @@ extern "C" { #include #include -void dnodeProcessMsgFromMgmt(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn); +int32_t dnodeInitMgmt(); +void dnodeInitMgmtIp(); + +void dnodeProcessMsgFromMgmt(char msgType, void *pCont, int contLen, void *pConn, int32_t code); +void dnodeSendMsgToMnode(int8_t msgType, void *pCont, int32_t contLen); +void dnodeSendRspToMnode(void *pConn, int8_t msgType, int32_t code, void *pCont, int32_t contLen); + void dnodeSendVpeerCfgMsg(int32_t vnode); void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid); -extern int32_t (*dnodeSendMsgToMnode)(int8_t *pCont, int32_t contLen, int8_t msgType); -extern int32_t (*dnodeSendSimpleRspToMnode)(void *pConn, int32_t msgType, int32_t code); + #ifdef __cplusplus } diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 63ef1814f0bc7ad7d4db5e8754184416d04433f2..68632dc41fac029fa62a1670619b54f5a5bb72b3 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -27,10 +27,15 @@ #include "dnodeWrite.h" #include "dnodeVnodeMgmt.h" -static int32_t (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn); +void (*dnodeInitMgmtIpFp)() = NULL; +int32_t (*dnodeInitMgmtFp)() = NULL; +void (*dnodeProcessStatusRspFp)(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) = NULL; +void (*dnodeSendMsgToMnodeFp)(int8_t msgType, void *pCont, int32_t contLen) = NULL; + +static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn); static void dnodeInitProcessShellMsg(); -void dnodeSendMsgToMnodeImpFp(SSchedMsg *sched) { +static void dnodeSendMsgToMnodeQueueFp(SSchedMsg *sched) { int8_t msgType = *(int8_t *) (sched->msg - sizeof(int32_t) - sizeof(int8_t)); int32_t contLen = *(int32_t *) (sched->msg - sizeof(int8_t)); int8_t *pCont = sched->msg; @@ -40,49 +45,58 @@ void dnodeSendMsgToMnodeImpFp(SSchedMsg *sched) { rpcFreeCont(sched->msg); } -int32_t dnodeSendMsgToMnodeImp(int8_t *pCont, int32_t contLen, int8_t msgType) { +void dnodeSendMsgToMnode(int8_t msgType, void *pCont, int32_t contLen) { dTrace("msg:%s is sent to mnode", taosMsg[msgType]); - *(int8_t *) (pCont - sizeof(int32_t) - sizeof(int8_t)) = msgType; - *(int32_t *) (pCont - sizeof(int8_t)) = contLen; - - SSchedMsg schedMsg = {0}; - schedMsg.fp = dnodeSendMsgToMnodeImpFp; - schedMsg.msg = pCont; - - taosScheduleTask(tsDnodeMgmtQhandle, &schedMsg); - - return TSDB_CODE_SUCCESS; + if (dnodeSendMsgToMnodeFp) { + dnodeSendMsgToMnodeFp(msgType, pCont, contLen); + } else { + SSchedMsg schedMsg = {0}; + schedMsg.fp = dnodeSendMsgToMnodeQueueFp; + schedMsg.msg = pCont; + *(int8_t *) (pCont - sizeof(int32_t) - sizeof(int8_t)) = msgType; + *(int32_t *) (pCont - sizeof(int8_t)) = contLen; + taosScheduleTask(tsDnodeMgmtQhandle, &schedMsg); + } } -int32_t (*dnodeSendMsgToMnode)(int8_t *pCont, int32_t contLen, int8_t msgType) = dnodeSendMsgToMnodeImp; - -int32_t dnodeSendSimpleRspToMnodeImp(void *pConn, int32_t msgType, int32_t code) { - int8_t *pCont = rpcMallocCont(sizeof(int32_t)); - *(int32_t *) pCont = code; - - dnodeSendMsgToMnodeImp(pCont, sizeof(int32_t), msgType); - return TSDB_CODE_SUCCESS; +void dnodeSendRspToMnode(void *pConn, int8_t msgType, int32_t code, void *pCont, int32_t contLen) { + dTrace("rsp:%s is sent to mnode", taosMsg[msgType]); + if (tsIsCluster) { + rpcSendResponse(pConn, code, pCont, contLen); + } else { + SSchedMsg schedMsg = {0}; + schedMsg.fp = dnodeSendMsgToMnodeFp; + schedMsg.msg = pCont; + *(int8_t *) (pCont - sizeof(int32_t) - sizeof(int8_t)) = msgType; + *(int32_t *) (pCont - sizeof(int8_t)) = contLen; + taosScheduleTask(tsDnodeMgmtQhandle, &schedMsg); + } } -int32_t (*dnodeSendSimpleRspToMnode)(void *pConn, int32_t msgType, int32_t code) = dnodeSendSimpleRspToMnodeImp; +int32_t dnodeInitMgmt() { + if (dnodeInitMgmtFp) { + dnodeInitMgmtFp(); + } -int32_t dnodeInitMgmtImp() { dnodeInitProcessShellMsg(); return 0; } -int32_t (*dnodeInitMgmt)() = dnodeInitMgmtImp; - -void dnodeInitMgmtIpImp() {} - -void (*dnodeInitMgmtIp)() = dnodeInitMgmtIpImp; +void dnodeInitMgmtIp() { + if (dnodeInitMgmtIpFp) { + dnodeInitMgmtIpFp(); + } +} -void dnodeProcessMsgFromMgmt(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn) { +void dnodeProcessMsgFromMgmt(char msgType, void *pCont, int contLen, void *pConn, int32_t code) { if (msgType < 0 || msgType >= TSDB_MSG_TYPE_MAX) { dError("invalid msg type:%d", msgType); } else { - if (dnodeProcessShellMsgFp[msgType]) { - (*dnodeProcessShellMsgFp[msgType])(pCont, contLen, msgType, pConn); + if (msgType == TSDB_MSG_TYPE_STATUS_RSP && dnodeProcessStatusRspFp != NULL) { + dnodeProcessStatusRspFp(pCont, contLen, msgType, pConn); + } + if (dnodeProcessMgmtMsgFp[msgType]) { + (*dnodeProcessMgmtMsgFp[msgType])(pCont, contLen, msgType, pConn); } else { dError("%s is not processed", taosMsg[msgType]); } @@ -109,16 +123,29 @@ int32_t dnodeProcessTableCfgRsp(int8_t *pCont, int32_t contLen, int8_t msgType, } int32_t dnodeProcessCreateTableRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) { - SDCreateTableMsg *table = (SDCreateTableMsg *) pCont; - int32_t code = dnodeCreateTable(table); - dnodeSendSimpleRspToMnode(pConn, msgType + 1, code); + SDCreateTableMsg *pTable = (SDCreateTableMsg *) pCont; + pTable->vnode = htonl(pTable->vnode); + pTable->sid = htonl(pTable->sid); + pTable->uid = htobe64(pTable->uid); + pTable->superTableUid = htobe64(pTable->superTableUid); + pTable->tableType = htonl(pTable->tableType); + pTable->sversion = htonl(pTable->sversion); + pTable->numOfColumns = htons(pTable->numOfColumns); + pTable->numOfTags = htons(pTable->numOfTags); + pTable->tagDataLen = htonl(pTable->tagDataLen); + pTable->sqlDataLen = htonl(pTable->sqlDataLen); + pTable->createdTime = htobe64(pTable->createdTime); + + int32_t code = dnodeCreateTable(pTable); + dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0); + return code; } int32_t dnodeProcessAlterStreamRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) { SAlterStreamMsg *stream = (SAlterStreamMsg *) pCont; int32_t code = dnodeCreateStream(stream); - dnodeSendSimpleRspToMnode(pConn, msgType + 1, code); + dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0); return code; } @@ -130,7 +157,7 @@ int32_t dnodeProcessRemoveTableRequest(int8_t *pCont, int32_t contLen, int8_t ms dPrint("vnode:%d, sid:%d table is not configured, remove it", vnode, sid); int32_t code = dnodeDropTable(vnode, sid, uid); - dnodeSendSimpleRspToMnode(pConn, msgType + 1, code); + dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0); return code; } @@ -159,7 +186,7 @@ int32_t dnodeProcessVPeersMsg(int8_t *pCont, int32_t contLen, int8_t msgType, vo dPrint("vnode:%d, start to config", vnode); int32_t code = dnodeCreateVnode(vnode, vpeer); - dnodeSendSimpleRspToMnode(pConn, msgType + 1, code); + dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0); return code; } @@ -170,7 +197,7 @@ int32_t dnodeProcessFreeVnodeRequest(int8_t *pCont, int32_t contLen, int8_t msgT dPrint("vnode:%d, remove it", vnode); int32_t code = dnodeDropVnode(vnode); - dnodeSendSimpleRspToMnode(pConn, msgType + 1, code); + dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0); return code; } @@ -178,7 +205,7 @@ int32_t dnodeProcessFreeVnodeRequest(int8_t *pCont, int32_t contLen, int8_t msgT int32_t dnodeProcessDnodeCfgRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) { SCfgDnodeMsg *pCfg = (SCfgDnodeMsg *)pCont; int32_t code = tsCfgDynamicOptions(pCfg->config); - dnodeSendSimpleRspToMnode(pConn, msgType + 1, code); + dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0); return code; } @@ -203,12 +230,12 @@ void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid) { } void dnodeInitProcessShellMsg() { - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_TABLE] = dnodeProcessCreateTableRequest; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_REMOVE_TABLE] = dnodeProcessRemoveTableRequest; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_VPEERS] = dnodeProcessVPeersMsg; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_FREE_VNODE] = dnodeProcessFreeVnodeRequest; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_CFG] = dnodeProcessDnodeCfgRequest; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_ALTER_STREAM] = dnodeProcessAlterStreamRequest; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_VNODE_CFG_RSP] = dnodeProcessVPeerCfgRsp; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_TABLE_CFG_RSP] = dnodeProcessTableCfgRsp; + dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_TABLE] = dnodeProcessCreateTableRequest; + dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_REMOVE_TABLE] = dnodeProcessRemoveTableRequest; + dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_VPEERS] = dnodeProcessVPeersMsg; + dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_FREE_VNODE] = dnodeProcessFreeVnodeRequest; + dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_CFG] = dnodeProcessDnodeCfgRequest; + dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_ALTER_STREAM] = dnodeProcessAlterStreamRequest; + dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_VNODE_CFG_RSP] = dnodeProcessVPeerCfgRsp; + dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_TABLE_CFG_RSP] = dnodeProcessTableCfgRsp; } \ No newline at end of file diff --git a/src/inc/dnode.h b/src/inc/dnode.h index 4aaf0e866ee47273a57617079821bda0a0246b22..65b17a88a0ba39b7d3d445557497f6cdef977ff1 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -44,13 +44,7 @@ extern uint32_t tsRebootTime; extern void (*dnodeStartModules)(); extern void (*dnodeParseParameterK)(); extern int32_t (*dnodeCheckSystem)(); -extern void (*dnodeInitMgmtIp)(); -extern int (*dnodeInitMgmt)(); -// dnodeMgmt -void dnodeProcessMsgFromMgmt(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn); -extern int32_t (*dnodeSendMsgToMnode)(int8_t *pCont, int32_t contLen, int8_t msgType); -extern int32_t (*dnodeSendSimpleRspToMnode)(void *pConn, int32_t msgType, int32_t code); // dnodeModule extern void (*dnodeStartModules)(); diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 67efef10f2935493aa3a923f22da50ea0fa352ce..211e8d10bb05980dee3238fbd4548388e6996260 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -572,6 +572,11 @@ typedef struct { SVnodeLoad load[]; } SStatusMsg; +typedef struct { + int32_t code; + SRpcIpSet ipList; +} SStatusRsp; + typedef struct { uint32_t moduleStatus; uint32_t createdTime; diff --git a/src/mnode/src/mgmtDnodeInt.c b/src/mnode/src/mgmtDnodeInt.c index 27e83abbcaa68598a50f377992931488ec27ce2e..088f76ea510d4312556abb1d3aafb0789b5e51ab 100644 --- a/src/mnode/src/mgmtDnodeInt.c +++ b/src/mnode/src/mgmtDnodeInt.c @@ -46,7 +46,7 @@ void mgmtSendMsgToDnodeImpFp(SSchedMsg *sched) { int8_t *pCont = sched->msg; void *pConn = NULL; - dnodeProcessMsgFromMgmt(pCont, contLen, msgType, pConn); + dnodeProcessMsgFromMgmt(msgType, pCont, contLen, pConn, TSDB_CODE_SUCCESS); rpcFreeCont(sched->msg); } @@ -187,6 +187,7 @@ int mgmtProcessVPeersRsp(int8_t *pCont, int32_t contLen, void *pConn) { return 0; } + void mgmtProcessMsgFromDnode(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn) { if (msgType == TSDB_MSG_TYPE_TABLE_CFG) { mgmtProcessMeterCfgMsg(pCont, contLen, pConn); diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index c55d3279faf3fbdbe97c9a52a96a66a996f850ce..a2a195d0c6858ccb23ac2ba3de2060ca63124414 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -91,7 +91,7 @@ int32_t mgmtInitShell() { } memset(&rpcInit, 0, sizeof(rpcInit)); - rpcInit.localIp = tsAnyIp ? "0.0.0.0" : tsPrivateIp;; + rpcInit.localIp = tsAnyIp ? "0.0.0.0" : tsPrivateIp; rpcInit.localPort = tsMgmtShellPort; rpcInit.label = "MND-shell"; rpcInit.numOfThreads = numOfThreads; diff --git a/src/util/inc/tglobalcfg.h b/src/util/inc/tglobalcfg.h index fa823c5a3b96017f1199534cd5b0baecc274f1f9..067eb389ecb1f889bda583e6f1fd36cf7acc45ab 100644 --- a/src/util/inc/tglobalcfg.h +++ b/src/util/inc/tglobalcfg.h @@ -58,7 +58,7 @@ extern char osName[]; extern char tsMasterIp[]; extern char tsSecondIp[]; -extern uint16_t tsMgmtVnodePort; +extern uint16_t tsMgmtDnodePort; extern uint16_t tsMgmtShellPort; extern uint16_t tsVnodeShellPort; extern uint16_t tsVnodeVnodePort; diff --git a/src/util/src/tglobalcfg.c b/src/util/src/tglobalcfg.c index 04978d537dbb9bf61513647215f592181bc648b4..73cb2bd24b3ca7c16b141aa5804530be233ea6f5 100644 --- a/src/util/src/tglobalcfg.c +++ b/src/util/src/tglobalcfg.c @@ -60,7 +60,7 @@ char tsMasterIp[TSDB_IPv4ADDR_LEN] = {0}; char tsSecondIp[TSDB_IPv4ADDR_LEN] = {0}; uint16_t tsMgmtShellPort = 6030; // udp[6030-6034] tcp[6030] uint16_t tsVnodeShellPort = 6035; // udp[6035-6039] tcp[6035] -uint16_t tsMgmtVnodePort = 6040; // udp[6040-6044] tcp[6040] +uint16_t tsMgmtDnodePort = 6040; // udp[6040-6044] tcp[6040] uint16_t tsVnodeVnodePort = 6045; // tcp[6045] uint16_t tsMgmtMgmtPort = 6050; // udp, numOfVnodes fixed to 1, range udp[6050] uint16_t tsMgmtSyncPort = 6050; // tcp, range tcp[6050] @@ -494,7 +494,7 @@ static void doInitGlobalConfig() { tsInitConfigOption(cfg++, "vnodeShellPort", &tsVnodeShellPort, TSDB_CFG_VTYPE_SHORT, TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT, 1, 65535, 0, TSDB_CFG_UTYPE_NONE); - tsInitConfigOption(cfg++, "mgmtVnodePort", &tsMgmtVnodePort, TSDB_CFG_VTYPE_SHORT, + tsInitConfigOption(cfg++, "mgmtVnodePort", &tsMgmtDnodePort, TSDB_CFG_VTYPE_SHORT, TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLUSTER, 1, 65535, 0, TSDB_CFG_UTYPE_NONE); tsInitConfigOption(cfg++, "vnodeVnodePort", &tsVnodeVnodePort, TSDB_CFG_VTYPE_SHORT,