提交 86402cc9 编写于 作者: S slguan

conection from mgmt to dnode

上级 ce906917
......@@ -23,12 +23,17 @@ extern "C" {
#include <stdint.h>
#include <stdbool.h>
void dnodeProcessMsgFromMgmt(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn);
int32_t dnodeInitMgmt();
void dnodeInitMgmtIp();
void dnodeProcessMsgFromMgmt(char msgType, void *pCont, int contLen, void *pConn, int32_t code);
void dnodeSendMsgToMnode(int8_t msgType, void *pCont, int32_t contLen);
void dnodeSendRspToMnode(void *pConn, int8_t msgType, int32_t code, void *pCont, int32_t contLen);
void dnodeSendVpeerCfgMsg(int32_t vnode);
void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid);
extern int32_t (*dnodeSendMsgToMnode)(int8_t *pCont, int32_t contLen, int8_t msgType);
extern int32_t (*dnodeSendSimpleRspToMnode)(void *pConn, int32_t msgType, int32_t code);
#ifdef __cplusplus
}
......
......@@ -27,10 +27,15 @@
#include "dnodeWrite.h"
#include "dnodeVnodeMgmt.h"
static int32_t (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn);
void (*dnodeInitMgmtIpFp)() = NULL;
int32_t (*dnodeInitMgmtFp)() = NULL;
void (*dnodeProcessStatusRspFp)(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) = NULL;
void (*dnodeSendMsgToMnodeFp)(int8_t msgType, void *pCont, int32_t contLen) = NULL;
static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn);
static void dnodeInitProcessShellMsg();
void dnodeSendMsgToMnodeImpFp(SSchedMsg *sched) {
static void dnodeSendMsgToMnodeQueueFp(SSchedMsg *sched) {
int8_t msgType = *(int8_t *) (sched->msg - sizeof(int32_t) - sizeof(int8_t));
int32_t contLen = *(int32_t *) (sched->msg - sizeof(int8_t));
int8_t *pCont = sched->msg;
......@@ -40,49 +45,58 @@ void dnodeSendMsgToMnodeImpFp(SSchedMsg *sched) {
rpcFreeCont(sched->msg);
}
int32_t dnodeSendMsgToMnodeImp(int8_t *pCont, int32_t contLen, int8_t msgType) {
void dnodeSendMsgToMnode(int8_t msgType, void *pCont, int32_t contLen) {
dTrace("msg:%s is sent to mnode", taosMsg[msgType]);
*(int8_t *) (pCont - sizeof(int32_t) - sizeof(int8_t)) = msgType;
*(int32_t *) (pCont - sizeof(int8_t)) = contLen;
SSchedMsg schedMsg = {0};
schedMsg.fp = dnodeSendMsgToMnodeImpFp;
schedMsg.msg = pCont;
taosScheduleTask(tsDnodeMgmtQhandle, &schedMsg);
return TSDB_CODE_SUCCESS;
if (dnodeSendMsgToMnodeFp) {
dnodeSendMsgToMnodeFp(msgType, pCont, contLen);
} else {
SSchedMsg schedMsg = {0};
schedMsg.fp = dnodeSendMsgToMnodeQueueFp;
schedMsg.msg = pCont;
*(int8_t *) (pCont - sizeof(int32_t) - sizeof(int8_t)) = msgType;
*(int32_t *) (pCont - sizeof(int8_t)) = contLen;
taosScheduleTask(tsDnodeMgmtQhandle, &schedMsg);
}
}
int32_t (*dnodeSendMsgToMnode)(int8_t *pCont, int32_t contLen, int8_t msgType) = dnodeSendMsgToMnodeImp;
int32_t dnodeSendSimpleRspToMnodeImp(void *pConn, int32_t msgType, int32_t code) {
int8_t *pCont = rpcMallocCont(sizeof(int32_t));
*(int32_t *) pCont = code;
dnodeSendMsgToMnodeImp(pCont, sizeof(int32_t), msgType);
return TSDB_CODE_SUCCESS;
void dnodeSendRspToMnode(void *pConn, int8_t msgType, int32_t code, void *pCont, int32_t contLen) {
dTrace("rsp:%s is sent to mnode", taosMsg[msgType]);
if (tsIsCluster) {
rpcSendResponse(pConn, code, pCont, contLen);
} else {
SSchedMsg schedMsg = {0};
schedMsg.fp = dnodeSendMsgToMnodeFp;
schedMsg.msg = pCont;
*(int8_t *) (pCont - sizeof(int32_t) - sizeof(int8_t)) = msgType;
*(int32_t *) (pCont - sizeof(int8_t)) = contLen;
taosScheduleTask(tsDnodeMgmtQhandle, &schedMsg);
}
}
int32_t (*dnodeSendSimpleRspToMnode)(void *pConn, int32_t msgType, int32_t code) = dnodeSendSimpleRspToMnodeImp;
int32_t dnodeInitMgmt() {
if (dnodeInitMgmtFp) {
dnodeInitMgmtFp();
}
int32_t dnodeInitMgmtImp() {
dnodeInitProcessShellMsg();
return 0;
}
int32_t (*dnodeInitMgmt)() = dnodeInitMgmtImp;
void dnodeInitMgmtIpImp() {}
void (*dnodeInitMgmtIp)() = dnodeInitMgmtIpImp;
void dnodeInitMgmtIp() {
if (dnodeInitMgmtIpFp) {
dnodeInitMgmtIpFp();
}
}
void dnodeProcessMsgFromMgmt(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn) {
void dnodeProcessMsgFromMgmt(char msgType, void *pCont, int contLen, void *pConn, int32_t code) {
if (msgType < 0 || msgType >= TSDB_MSG_TYPE_MAX) {
dError("invalid msg type:%d", msgType);
} else {
if (dnodeProcessShellMsgFp[msgType]) {
(*dnodeProcessShellMsgFp[msgType])(pCont, contLen, msgType, pConn);
if (msgType == TSDB_MSG_TYPE_STATUS_RSP && dnodeProcessStatusRspFp != NULL) {
dnodeProcessStatusRspFp(pCont, contLen, msgType, pConn);
}
if (dnodeProcessMgmtMsgFp[msgType]) {
(*dnodeProcessMgmtMsgFp[msgType])(pCont, contLen, msgType, pConn);
} else {
dError("%s is not processed", taosMsg[msgType]);
}
......@@ -109,16 +123,29 @@ int32_t dnodeProcessTableCfgRsp(int8_t *pCont, int32_t contLen, int8_t msgType,
}
int32_t dnodeProcessCreateTableRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) {
SDCreateTableMsg *table = (SDCreateTableMsg *) pCont;
int32_t code = dnodeCreateTable(table);
dnodeSendSimpleRspToMnode(pConn, msgType + 1, code);
SDCreateTableMsg *pTable = (SDCreateTableMsg *) pCont;
pTable->vnode = htonl(pTable->vnode);
pTable->sid = htonl(pTable->sid);
pTable->uid = htobe64(pTable->uid);
pTable->superTableUid = htobe64(pTable->superTableUid);
pTable->tableType = htonl(pTable->tableType);
pTable->sversion = htonl(pTable->sversion);
pTable->numOfColumns = htons(pTable->numOfColumns);
pTable->numOfTags = htons(pTable->numOfTags);
pTable->tagDataLen = htonl(pTable->tagDataLen);
pTable->sqlDataLen = htonl(pTable->sqlDataLen);
pTable->createdTime = htobe64(pTable->createdTime);
int32_t code = dnodeCreateTable(pTable);
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
return code;
}
int32_t dnodeProcessAlterStreamRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) {
SAlterStreamMsg *stream = (SAlterStreamMsg *) pCont;
int32_t code = dnodeCreateStream(stream);
dnodeSendSimpleRspToMnode(pConn, msgType + 1, code);
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
return code;
}
......@@ -130,7 +157,7 @@ int32_t dnodeProcessRemoveTableRequest(int8_t *pCont, int32_t contLen, int8_t ms
dPrint("vnode:%d, sid:%d table is not configured, remove it", vnode, sid);
int32_t code = dnodeDropTable(vnode, sid, uid);
dnodeSendSimpleRspToMnode(pConn, msgType + 1, code);
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
return code;
}
......@@ -159,7 +186,7 @@ int32_t dnodeProcessVPeersMsg(int8_t *pCont, int32_t contLen, int8_t msgType, vo
dPrint("vnode:%d, start to config", vnode);
int32_t code = dnodeCreateVnode(vnode, vpeer);
dnodeSendSimpleRspToMnode(pConn, msgType + 1, code);
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
return code;
}
......@@ -170,7 +197,7 @@ int32_t dnodeProcessFreeVnodeRequest(int8_t *pCont, int32_t contLen, int8_t msgT
dPrint("vnode:%d, remove it", vnode);
int32_t code = dnodeDropVnode(vnode);
dnodeSendSimpleRspToMnode(pConn, msgType + 1, code);
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
return code;
}
......@@ -178,7 +205,7 @@ int32_t dnodeProcessFreeVnodeRequest(int8_t *pCont, int32_t contLen, int8_t msgT
int32_t dnodeProcessDnodeCfgRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) {
SCfgDnodeMsg *pCfg = (SCfgDnodeMsg *)pCont;
int32_t code = tsCfgDynamicOptions(pCfg->config);
dnodeSendSimpleRspToMnode(pConn, msgType + 1, code);
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
return code;
}
......@@ -203,12 +230,12 @@ void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid) {
}
void dnodeInitProcessShellMsg() {
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_TABLE] = dnodeProcessCreateTableRequest;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_REMOVE_TABLE] = dnodeProcessRemoveTableRequest;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_VPEERS] = dnodeProcessVPeersMsg;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_FREE_VNODE] = dnodeProcessFreeVnodeRequest;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_CFG] = dnodeProcessDnodeCfgRequest;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_ALTER_STREAM] = dnodeProcessAlterStreamRequest;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_VNODE_CFG_RSP] = dnodeProcessVPeerCfgRsp;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_TABLE_CFG_RSP] = dnodeProcessTableCfgRsp;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_TABLE] = dnodeProcessCreateTableRequest;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_REMOVE_TABLE] = dnodeProcessRemoveTableRequest;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_VPEERS] = dnodeProcessVPeersMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_FREE_VNODE] = dnodeProcessFreeVnodeRequest;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_CFG] = dnodeProcessDnodeCfgRequest;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_ALTER_STREAM] = dnodeProcessAlterStreamRequest;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_VNODE_CFG_RSP] = dnodeProcessVPeerCfgRsp;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_TABLE_CFG_RSP] = dnodeProcessTableCfgRsp;
}
\ No newline at end of file
......@@ -44,13 +44,7 @@ extern uint32_t tsRebootTime;
extern void (*dnodeStartModules)();
extern void (*dnodeParseParameterK)();
extern int32_t (*dnodeCheckSystem)();
extern void (*dnodeInitMgmtIp)();
extern int (*dnodeInitMgmt)();
// dnodeMgmt
void dnodeProcessMsgFromMgmt(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn);
extern int32_t (*dnodeSendMsgToMnode)(int8_t *pCont, int32_t contLen, int8_t msgType);
extern int32_t (*dnodeSendSimpleRspToMnode)(void *pConn, int32_t msgType, int32_t code);
// dnodeModule
extern void (*dnodeStartModules)();
......
......@@ -572,6 +572,11 @@ typedef struct {
SVnodeLoad load[];
} SStatusMsg;
typedef struct {
int32_t code;
SRpcIpSet ipList;
} SStatusRsp;
typedef struct {
uint32_t moduleStatus;
uint32_t createdTime;
......
......@@ -46,7 +46,7 @@ void mgmtSendMsgToDnodeImpFp(SSchedMsg *sched) {
int8_t *pCont = sched->msg;
void *pConn = NULL;
dnodeProcessMsgFromMgmt(pCont, contLen, msgType, pConn);
dnodeProcessMsgFromMgmt(msgType, pCont, contLen, pConn, TSDB_CODE_SUCCESS);
rpcFreeCont(sched->msg);
}
......@@ -187,6 +187,7 @@ int mgmtProcessVPeersRsp(int8_t *pCont, int32_t contLen, void *pConn) {
return 0;
}
void mgmtProcessMsgFromDnode(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn) {
if (msgType == TSDB_MSG_TYPE_TABLE_CFG) {
mgmtProcessMeterCfgMsg(pCont, contLen, pConn);
......
......@@ -91,7 +91,7 @@ int32_t mgmtInitShell() {
}
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localIp = tsAnyIp ? "0.0.0.0" : tsPrivateIp;;
rpcInit.localIp = tsAnyIp ? "0.0.0.0" : tsPrivateIp;
rpcInit.localPort = tsMgmtShellPort;
rpcInit.label = "MND-shell";
rpcInit.numOfThreads = numOfThreads;
......
......@@ -58,7 +58,7 @@ extern char osName[];
extern char tsMasterIp[];
extern char tsSecondIp[];
extern uint16_t tsMgmtVnodePort;
extern uint16_t tsMgmtDnodePort;
extern uint16_t tsMgmtShellPort;
extern uint16_t tsVnodeShellPort;
extern uint16_t tsVnodeVnodePort;
......
......@@ -60,7 +60,7 @@ char tsMasterIp[TSDB_IPv4ADDR_LEN] = {0};
char tsSecondIp[TSDB_IPv4ADDR_LEN] = {0};
uint16_t tsMgmtShellPort = 6030; // udp[6030-6034] tcp[6030]
uint16_t tsVnodeShellPort = 6035; // udp[6035-6039] tcp[6035]
uint16_t tsMgmtVnodePort = 6040; // udp[6040-6044] tcp[6040]
uint16_t tsMgmtDnodePort = 6040; // udp[6040-6044] tcp[6040]
uint16_t tsVnodeVnodePort = 6045; // tcp[6045]
uint16_t tsMgmtMgmtPort = 6050; // udp, numOfVnodes fixed to 1, range udp[6050]
uint16_t tsMgmtSyncPort = 6050; // tcp, range tcp[6050]
......@@ -494,7 +494,7 @@ static void doInitGlobalConfig() {
tsInitConfigOption(cfg++, "vnodeShellPort", &tsVnodeShellPort, TSDB_CFG_VTYPE_SHORT,
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT,
1, 65535, 0, TSDB_CFG_UTYPE_NONE);
tsInitConfigOption(cfg++, "mgmtVnodePort", &tsMgmtVnodePort, TSDB_CFG_VTYPE_SHORT,
tsInitConfigOption(cfg++, "mgmtVnodePort", &tsMgmtDnodePort, TSDB_CFG_VTYPE_SHORT,
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLUSTER,
1, 65535, 0, TSDB_CFG_UTYPE_NONE);
tsInitConfigOption(cfg++, "vnodeVnodePort", &tsVnodeVnodePort, TSDB_CFG_VTYPE_SHORT,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册