From 3aef011db25b769cc511518fbe7a912c6bff0f6d Mon Sep 17 00:00:00 2001 From: slguan Date: Tue, 18 Feb 2020 23:58:53 +0800 Subject: [PATCH] dnodeMgmt.h --- src/dnode/inc/dnodeMgmt.h | 2 +- src/dnode/src/dnodeMgmt.c | 60 ++++++++++------------- src/inc/dnode.h | 11 ++--- src/inc/trpc.h | 3 ++ src/mnode/inc/mgmtDnodeInt.h | 7 +-- src/mnode/src/mgmtDnodeInt.c | 93 +++++++++++++++--------------------- 6 files changed, 74 insertions(+), 102 deletions(-) diff --git a/src/dnode/inc/dnodeMgmt.h b/src/dnode/inc/dnodeMgmt.h index 4ee0d57cfa..5db16258f7 100644 --- a/src/dnode/inc/dnodeMgmt.h +++ b/src/dnode/inc/dnodeMgmt.h @@ -26,7 +26,7 @@ extern "C" { #include "dnode.h" -void dnodeDistributeMsgFromMgmt(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn); +void dnodeProcessMsgFromMgmt(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn); extern void *tsDnodeMgmtQhandle; diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index ef4f26a889..282ff8e756 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -31,43 +31,41 @@ static int (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(int8_t *pCont, int32_t contLen, void *pConn); static void dnodeInitProcessShellMsg(); -void taosSendMsgToMnodeImpFp(SSchedMsg *sched) { - char msgType = *sched->msg; - char *content = sched->msg + sizeof(int32_t); +void dnodeSendMsgToMnodeImpFp(SSchedMsg *sched) { + int8_t msgType = *(sched->msg - 1); + int8_t *pCont = sched->msg; + int32_t contLen = (int32_t) sched->ahandle; + void *pConn = NULL; - mgmtProcessMsgFromDnode(content, 0, msgType, NULL); + mgmtProcessMsgFromDnode(pCont, contLen, msgType, pConn); rpcFreeCont(sched->msg); } -int32_t taosSendMsgToMnodeImp(int8_t *msg, int32_t msgLen) { - dTrace("msg:%s is sent to mnode", taosMsg[(int32_t)(*(msg-sizeof(int32_t)))]); +int32_t dnodeSendMsgToMnodeImp(int8_t *pCont, int32_t contLen, int8_t msgType) { + dTrace("msg:%s is sent to mnode", taosMsg[msgType]); + *(pCont-1) = msgType; - /* - * Lite version has no message header, so minus one - */ SSchedMsg schedMsg; - schedMsg.fp = taosSendMsgToMnodeImpFp; - schedMsg.msg = msg - sizeof(int32_t); - schedMsg.ahandle = NULL; + schedMsg.fp = dnodeSendMsgToMnodeImpFp; + schedMsg.msg = pCont; + schedMsg.ahandle = (void*)contLen; schedMsg.thandle = NULL; taosScheduleTask(tsDnodeMgmtQhandle, &schedMsg); - return 0; + return TSDB_CODE_SUCCESS; } -int32_t (*taosSendMsgToMnode)(char *msg, int32_t msgLen) = taosSendMsgToMnodeImp; -int32_t taosSendSimpleRspToMnodeImp(int32_t rsptype, int32_t code) { - char *pStart = taosBuildRspMsgToMnode(0, rsptype); - if (pStart == NULL) { - return 0; - } +int32_t (*dnodeSendMsgToMnode)(int8_t *pCont, int32_t contLen, int8_t msgType) = dnodeSendMsgToMnodeImp; - *pStart = code; - taosSendMsgToMnode(0, pStart, code); +int32_t dnodeSendSimpleRspToMnodeImp(int32_t msgType, int32_t code) { + int8_t *pCont = rpcMallocCont(sizeof(int32_t)); + *(int32_t *) pCont = code; - return 0; + dnodeSendMsgToMnodeImp(pCont, sizeof(int32_t), msgType); + return TSDB_CODE_SUCCESS; } -int (*taosSendSimpleRspToMnode)(int32_t rsptype, int32_t code) = taosSendSimpleRspToMnodeImp; + +int32_t (*dnodeSendSimpleRspToMnode)(int32_t msgType, int32_t code) = dnodeSendSimpleRspToMnodeImp; int32_t dnodeInitMgmtImp() { dnodeInitProcessShellMsg(); @@ -80,17 +78,7 @@ void dnodeInitMgmtIpImp() {} void (*dnodeInitMgmtIp)() = dnodeInitMgmtIpImp; -void dnodeProcessMsgFromMgmtImp(SSchedMsg *sched) { - int32_t msgType = *(int32_t*)(sched->msg); - int8_t *content = sched->msg + sizeof(int32_t); - - dTrace("msg:%s is received from mnode", taosMsg[msgType]); - dnodeDistributeMsgFromMgmt(content, 0, msgType, NULL); - - free(sched->msg); -} - -void dnodeDistributeMsgFromMgmt(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn) { +void dnodeProcessMsgFromMgmt(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn) { if (msgType < 0 || msgType >= TSDB_MSG_TYPE_MAX) { dError("invalid msg type:%d", msgType); } else { @@ -202,7 +190,7 @@ void dnodeSendVpeerCfgMsg(int32_t vnode) { } cfg->vnode = htonl(vnode); - taosSendMsgToMnode(cfg, sizeof(SMeterCfgMsg)); + dnodeSendMsgToMnode(cfg, sizeof(SMeterCfgMsg)); } void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid) { @@ -212,7 +200,7 @@ void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid) { } cfg->vnode = htonl(vnode); - taosSendMsgToMnode(cfg, sizeof(SMeterCfgMsg)); + dnodeSendMsgToMnode(cfg, sizeof(SMeterCfgMsg)); } void dnodeInitProcessShellMsg() { diff --git a/src/inc/dnode.h b/src/inc/dnode.h index 0970328806..3813dd7276 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -45,22 +45,21 @@ extern uint32_t tsRebootTime; extern void (*dnodeStartModules)(); extern void (*dnodeParseParameterK)(); extern int32_t (*dnodeCheckSystem)(); -extern char *(*taosBuildRspMsgToMnodeWithSize)(SMgmtObj *pObj, char type, int size); -extern char *(*taosBuildReqMsgToMnodeWithSize)(SMgmtObj *pObj, char type, int size); -extern char *(*taosBuildRspMsgToMnode)(SMgmtObj *pObj, char type); -extern char *(*taosBuildReqMsgToMnode)(SMgmtObj *pObj, char type); -extern int (*taosSendSimpleRspToMnode)(SMgmtObj *pObj, char rsptype, char code); extern void (*dnodeInitMgmtIp)(); extern int (*dnodeInitMgmt)(); +int32_t (*dnodeSendMsgToMnode)(int8_t *pCont, int32_t contLen, int8_t msgType); +int32_t (*dnodeSendSimpleRspToMnode)(int32_t msgType, int32_t code); + + // multilevelStorage extern int32_t (*dnodeInitStorage)(); extern void (*dnodeCleanupStorage)(); void dnodeCheckDataDirOpenned(const char* dir); -void dnodeProcessMsgFromMgmtImp(SSchedMsg *sched); +void dnodeProcessMsgFromMgmt(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn); void dnodeLockVnodes(); diff --git a/src/inc/trpc.h b/src/inc/trpc.h index 094c2972ab..b97837a3bf 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -19,6 +19,9 @@ extern "C" { #endif +#include +#include + #define TAOS_CONN_UDPS 0 #define TAOS_CONN_UDPC 1 #define TAOS_CONN_TCPS 2 diff --git a/src/mnode/inc/mgmtDnodeInt.h b/src/mnode/inc/mgmtDnodeInt.h index a81b197f2c..e9d6aa1a26 100644 --- a/src/mnode/inc/mgmtDnodeInt.h +++ b/src/mnode/inc/mgmtDnodeInt.h @@ -38,14 +38,11 @@ int mgmtSendOneFreeVnodeMsg(SVnodeGid *pVnodeGid); char *taosBuildRspMsgToDnode(SDnodeObj *pObj, char type); char *taosBuildReqMsgToDnode(SDnodeObj *pObj, char type); -extern char* (*taosBuildRspMsgToDnodeWithSize)(SDnodeObj *pObj, char type, int32_t size); -extern char* (*taosBuildReqMsgToDnodeWithSize)(SDnodeObj *pObj, char type, int32_t size); -extern int32_t (*taosSendSimpleRspToDnode)(SDnodeObj *pObj, char rsptype, char code); -extern int32_t (*taosSendMsgToDnode)(SDnodeObj *pObj, char *msg, int32_t msgLen); +extern int32_t (*mgmtSendSimpleRspToDnode)(int32_t msgType, int32_t code); +extern int32_t (*mgmtSendMsgToDnode)(SDnodeObj *pObj, char *msg, int msgLen); extern int32_t (*mgmtInitDnodeInt)(); extern void (*mgmtCleanUpDnodeInt)(); extern void (*mgmtProcessDnodeStatus)(void *handle, void *tmrId); -extern void (*mgmtProcessMsgFromDnodeSpec)(SSchedMsg *sched); #ifdef __cplusplus diff --git a/src/mnode/src/mgmtDnodeInt.c b/src/mnode/src/mgmtDnodeInt.c index a2657dfc40..b8e740925c 100644 --- a/src/mnode/src/mgmtDnodeInt.c +++ b/src/mnode/src/mgmtDnodeInt.c @@ -50,7 +50,7 @@ int mgmtProcessMeterCfgMsg(char *cont, int contLen, SDnodeObj *pObj) { SVgObj * pVgroup; if (!sdbMaster) { - taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_TABLE_CFG_RSP, TSDB_CODE_REDIRECT); + mgmtSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_TABLE_CFG_RSP, TSDB_CODE_REDIRECT); return 0; } @@ -59,7 +59,7 @@ int mgmtProcessMeterCfgMsg(char *cont, int contLen, SDnodeObj *pObj) { pStart = taosBuildRspMsgToDnodeWithSize(pObj, TSDB_MSG_TYPE_TABLE_CFG_RSP, 64000); if (pStart == NULL) { - taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_TABLE_CFG_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); + mgmtSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_TABLE_CFG_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); return 0; } @@ -88,7 +88,7 @@ int mgmtProcessMeterCfgMsg(char *cont, int contLen, SDnodeObj *pObj) { } msgLen = pMsg - pStart; - taosSendMsgToDnode(pObj, pStart, msgLen); + mgmtSendMsgToDnode(pObj, pStart, msgLen); return 0; } @@ -100,7 +100,7 @@ int mgmtProcessVpeerCfgMsg(char *cont, int contLen, SDnodeObj *pObj) { SVgObj * pVgroup = NULL; if (!sdbMaster) { - taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_VNODE_CFG_RSP, TSDB_CODE_REDIRECT); + mgmtSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_VNODE_CFG_RSP, TSDB_CODE_REDIRECT); return 0; } @@ -108,7 +108,7 @@ int mgmtProcessVpeerCfgMsg(char *cont, int contLen, SDnodeObj *pObj) { pStart = taosBuildRspMsgToDnode(pObj, TSDB_MSG_TYPE_VNODE_CFG_RSP); if (pStart == NULL) { - taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_VNODE_CFG_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); + mgmtSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_VNODE_CFG_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); return 0; } pMsg = pStart; @@ -129,7 +129,7 @@ int mgmtProcessVpeerCfgMsg(char *cont, int contLen, SDnodeObj *pObj) { } msgLen = pMsg - pStart; - taosSendMsgToDnode(pObj, pStart, msgLen); + mgmtSendMsgToDnode(pObj, pStart, msgLen); return 0; } @@ -142,7 +142,7 @@ int mgmtProcessVPeersRsp(char *msg, int msgLen, SDnodeObj *pObj) { STaosRsp *pRsp = (STaosRsp *)msg; if (!sdbMaster) { - taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_DNODE_VPEERS_RSP, TSDB_CODE_REDIRECT); + mgmtSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_DNODE_VPEERS_RSP, TSDB_CODE_REDIRECT); return 0; } @@ -251,7 +251,7 @@ int32_t mgmtSendCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgroup, int int8_t *pMsg = mgmtBuildCreateChildTableMsg(pTable, pStart, pVgroup->vnodeGid[index].vnode, tagDataLen, pTagData); int32_t msgLen = pMsg - pStart; - taosSendMsgToDnode(pObj, pStart, msgLen); + mgmtSendMsgToDnode(pObj, pStart, msgLen); } pVgroup->lastCreate = timeStamp; @@ -275,7 +275,7 @@ int32_t mgmtSendCreateStreamTableMsg(SStreamTableObj *pTable, SVgObj *pVgroup) { int8_t *pMsg = mgmtBuildCreateStreamTableMsg(pTable, pStart, pVgroup->vnodeGid[index].vnode); int32_t msgLen = pMsg - pStart; - taosSendMsgToDnode(pObj, pStart, msgLen); + mgmtSendMsgToDnode(pObj, pStart, msgLen); } pVgroup->lastCreate = timeStamp; @@ -299,7 +299,7 @@ int32_t mgmtSendCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgroup) { int8_t *pMsg = mgmtBuildCreateNormalTableMsg(pTable, pStart, pVgroup->vnodeGid[index].vnode); int32_t msgLen = pMsg - pStart; - taosSendMsgToDnode(pObj, pStart, msgLen); + mgmtSendMsgToDnode(pObj, pStart, msgLen); } pVgroup->lastCreate = timeStamp; @@ -334,7 +334,7 @@ int mgmtSendRemoveMeterMsgToDnode(STabObj *pTable, SVgObj *pVgroup) { pMsg += sizeof(SDRemoveTableMsg); msgLen = pMsg - pStart; - taosSendMsgToDnode(pObj, pStart, msgLen); + mgmtSendMsgToDnode(pObj, pStart, msgLen); tinet_ntoa(ipstr, pVgroup->vnodeGid[i].ip); mTrace("dnode:%s vid:%d, send remove meter msg, sid:%d status:%d", ipstr, pVgroup->vnodeGid[i].vnode, @@ -371,7 +371,7 @@ int mgmtSendAlterStreamMsgToDnode(STabObj *pTable, SVgObj *pVgroup) { pMsg += sizeof(SAlterStreamMsg); msgLen = pMsg - pStart; - taosSendMsgToDnode(pObj, pStart, msgLen); + mgmtSendMsgToDnode(pObj, pStart, msgLen); } return 0; @@ -433,7 +433,7 @@ int mgmtSendVPeersMsg(SVgObj *pVgroup) { pMsg = mgmtBuildVpeersIe(pStart, pVgroup, pVgroup->vnodeGid[i].vnode); msgLen = pMsg - pStart; - taosSendMsgToDnode(pDnode, pStart, msgLen); + mgmtSendMsgToDnode(pDnode, pStart, msgLen); } } @@ -467,7 +467,7 @@ int mgmtSendOneFreeVnodeMsg(SVnodeGid *pVnodeGid) { pMsg += sizeof(SFreeVnodeMsg); msgLen = pMsg - pStart; - taosSendMsgToDnode(pDnode, pStart, msgLen); + mgmtSendMsgToDnode(pDnode, pStart, msgLen); return 0; } @@ -547,7 +547,7 @@ int mgmtSendCfgDnodeMsg(char *cont) { pMsg += sizeof(SCfgMsg); msgLen = pMsg - pStart; - taosSendMsgToDnode(pDnode, pStart, msgLen); + mgmtSendMsgToDnode(pDnode, pStart, msgLen); #else (void)tsCfgDynamicOptions(pCfg->config); #endif @@ -564,56 +564,41 @@ void * mgmtStatusTimer = NULL; void mgmtProcessMsgFromDnode(char *content, int msgLen, int msgType, SDnodeObj *pObj); -char* taosBuildRspMsgToDnodeWithSizeImp(SDnodeObj *pObj, char type, int32_t size) { - char *pStart = (char *)malloc(size); - if (pStart == NULL) { - return NULL; - } +void mgmtSendMsgToDnodeImpFp(SSchedMsg *sched) { + int8_t msgType = *(sched->msg - 1); + int8_t *pCont = sched->msg; + int32_t contLen = (int32_t) sched->ahandle; + void *pConn = NULL; - *pStart = type; - return pStart + 1; + dnodeProcessMsgFromMgmt(pCont, contLen, msgType, pConn); + rpcFreeCont(sched->msg); } -char* (*taosBuildRspMsgToDnodeWithSize)(SDnodeObj *pObj, char type, int32_t size) = taosBuildRspMsgToDnodeWithSizeImp; - -char* taosBuildReqMsgToDnodeWithSizeImp(SDnodeObj *pObj, char type, int32_t size) { - char *pStart = (char *)malloc(size); - if (pStart == NULL) { - return NULL; - } - *pStart = type; - return pStart + 1; -} -char* (*taosBuildReqMsgToDnodeWithSize)(SDnodeObj *pObj, char type, int32_t size) = taosBuildReqMsgToDnodeWithSizeImp; +int32_t mgmtSendMsgToDnodeImp(int8_t *pCont, int32_t contLen, int8_t msgType) { + mTrace("msg:%s is sent to dnode", taosMsg[msgType]); + *(pCont-1) = msgType; -char *taosBuildRspMsgToDnode(SDnodeObj *pObj, char type) { - return taosBuildRspMsgToDnodeWithSize(pObj, type, 256); -} + SSchedMsg schedMsg; + schedMsg.fp = mgmtSendMsgToDnodeImpFp; + schedMsg.msg = pCont; + schedMsg.ahandle = (void*)contLen; + schedMsg.thandle = NULL; + taosScheduleTask(tsDnodeMgmtQhandle, &schedMsg); -char *taosBuildReqMsgToDnode(SDnodeObj *pObj, char type) { - return taosBuildReqMsgToDnodeWithSize(pObj, type, 256); + return TSDB_CODE_SUCCESS; } -int32_t taosSendSimpleRspToDnodeImp(SDnodeObj *pObj, char rsptype, char code) { return 0; } -int32_t (*taosSendSimpleRspToDnode)(SDnodeObj *pObj, char rsptype, char code) = taosSendSimpleRspToDnodeImp; +int32_t (*mgmtSendMsgToDnode)(SDnodeObj *pObj, char *msg, int msgLen) = mgmtSendMsgToDnodeImp; -int32_t taosSendMsgToDnodeImp(SDnodeObj *pObj, char *msg, int32_t msgLen) { - mTrace("msg:%s is sent to dnode", taosMsg[(uint8_t)(*(msg-1))]); - - /* - * Lite version has no message header, so minus one - */ - SSchedMsg schedMsg; - schedMsg.fp = dnodeProcessMsgFromMgmtImp; - schedMsg.msg = msg - 1; - schedMsg.ahandle = NULL; - schedMsg.thandle = NULL; - taosScheduleTask(tsDnodeMgmtQhandle, &schedMsg); +int32_t mgmtSendSimpleRspToDnodeImp(int32_t msgType, int32_t code) { + int8_t *pCont = rpcMallocCont(sizeof(int32_t)); + *(int32_t *) pCont = code; - return 0; + mgmtSendMsgToDnodeImp(pCont, sizeof(int32_t), msgType); + return TSDB_CODE_SUCCESS; } -int32_t (*taosSendMsgToDnode)(SDnodeObj *pObj, char *msg, int msgLen) = taosSendMsgToDnodeImp; +int32_t (*mgmtSendSimpleRspToDnode)(int32_t msgType, int32_t code) = mgmtSendSimpleRspToDnodeImp; int32_t mgmtInitDnodeIntImp() { return 0; } int32_t (*mgmtInitDnodeInt)() = mgmtInitDnodeIntImp; -- GitLab