提交 3aef011d 编写于 作者: S slguan

dnodeMgmt.h

上级 10f78bb5
......@@ -26,7 +26,7 @@ extern "C" {
#include "dnode.h"
void dnodeDistributeMsgFromMgmt(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn);
void dnodeProcessMsgFromMgmt(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn);
extern void *tsDnodeMgmtQhandle;
......
......@@ -31,43 +31,41 @@
static int (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(int8_t *pCont, int32_t contLen, void *pConn);
static void dnodeInitProcessShellMsg();
void taosSendMsgToMnodeImpFp(SSchedMsg *sched) {
char msgType = *sched->msg;
char *content = sched->msg + sizeof(int32_t);
void dnodeSendMsgToMnodeImpFp(SSchedMsg *sched) {
int8_t msgType = *(sched->msg - 1);
int8_t *pCont = sched->msg;
int32_t contLen = (int32_t) sched->ahandle;
void *pConn = NULL;
mgmtProcessMsgFromDnode(content, 0, msgType, NULL);
mgmtProcessMsgFromDnode(pCont, contLen, msgType, pConn);
rpcFreeCont(sched->msg);
}
int32_t taosSendMsgToMnodeImp(int8_t *msg, int32_t msgLen) {
dTrace("msg:%s is sent to mnode", taosMsg[(int32_t)(*(msg-sizeof(int32_t)))]);
int32_t dnodeSendMsgToMnodeImp(int8_t *pCont, int32_t contLen, int8_t msgType) {
dTrace("msg:%s is sent to mnode", taosMsg[msgType]);
*(pCont-1) = msgType;
/*
* Lite version has no message header, so minus one
*/
SSchedMsg schedMsg;
schedMsg.fp = taosSendMsgToMnodeImpFp;
schedMsg.msg = msg - sizeof(int32_t);
schedMsg.ahandle = NULL;
schedMsg.fp = dnodeSendMsgToMnodeImpFp;
schedMsg.msg = pCont;
schedMsg.ahandle = (void*)contLen;
schedMsg.thandle = NULL;
taosScheduleTask(tsDnodeMgmtQhandle, &schedMsg);
return 0;
return TSDB_CODE_SUCCESS;
}
int32_t (*taosSendMsgToMnode)(char *msg, int32_t msgLen) = taosSendMsgToMnodeImp;
int32_t taosSendSimpleRspToMnodeImp(int32_t rsptype, int32_t code) {
char *pStart = taosBuildRspMsgToMnode(0, rsptype);
if (pStart == NULL) {
return 0;
}
int32_t (*dnodeSendMsgToMnode)(int8_t *pCont, int32_t contLen, int8_t msgType) = dnodeSendMsgToMnodeImp;
*pStart = code;
taosSendMsgToMnode(0, pStart, code);
int32_t dnodeSendSimpleRspToMnodeImp(int32_t msgType, int32_t code) {
int8_t *pCont = rpcMallocCont(sizeof(int32_t));
*(int32_t *) pCont = code;
return 0;
dnodeSendMsgToMnodeImp(pCont, sizeof(int32_t), msgType);
return TSDB_CODE_SUCCESS;
}
int (*taosSendSimpleRspToMnode)(int32_t rsptype, int32_t code) = taosSendSimpleRspToMnodeImp;
int32_t (*dnodeSendSimpleRspToMnode)(int32_t msgType, int32_t code) = dnodeSendSimpleRspToMnodeImp;
int32_t dnodeInitMgmtImp() {
dnodeInitProcessShellMsg();
......@@ -80,17 +78,7 @@ void dnodeInitMgmtIpImp() {}
void (*dnodeInitMgmtIp)() = dnodeInitMgmtIpImp;
void dnodeProcessMsgFromMgmtImp(SSchedMsg *sched) {
int32_t msgType = *(int32_t*)(sched->msg);
int8_t *content = sched->msg + sizeof(int32_t);
dTrace("msg:%s is received from mnode", taosMsg[msgType]);
dnodeDistributeMsgFromMgmt(content, 0, msgType, NULL);
free(sched->msg);
}
void dnodeDistributeMsgFromMgmt(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn) {
void dnodeProcessMsgFromMgmt(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn) {
if (msgType < 0 || msgType >= TSDB_MSG_TYPE_MAX) {
dError("invalid msg type:%d", msgType);
} else {
......@@ -202,7 +190,7 @@ void dnodeSendVpeerCfgMsg(int32_t vnode) {
}
cfg->vnode = htonl(vnode);
taosSendMsgToMnode(cfg, sizeof(SMeterCfgMsg));
dnodeSendMsgToMnode(cfg, sizeof(SMeterCfgMsg));
}
void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid) {
......@@ -212,7 +200,7 @@ void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid) {
}
cfg->vnode = htonl(vnode);
taosSendMsgToMnode(cfg, sizeof(SMeterCfgMsg));
dnodeSendMsgToMnode(cfg, sizeof(SMeterCfgMsg));
}
void dnodeInitProcessShellMsg() {
......
......@@ -45,22 +45,21 @@ extern uint32_t tsRebootTime;
extern void (*dnodeStartModules)();
extern void (*dnodeParseParameterK)();
extern int32_t (*dnodeCheckSystem)();
extern char *(*taosBuildRspMsgToMnodeWithSize)(SMgmtObj *pObj, char type, int size);
extern char *(*taosBuildReqMsgToMnodeWithSize)(SMgmtObj *pObj, char type, int size);
extern char *(*taosBuildRspMsgToMnode)(SMgmtObj *pObj, char type);
extern char *(*taosBuildReqMsgToMnode)(SMgmtObj *pObj, char type);
extern int (*taosSendSimpleRspToMnode)(SMgmtObj *pObj, char rsptype, char code);
extern void (*dnodeInitMgmtIp)();
extern int (*dnodeInitMgmt)();
int32_t (*dnodeSendMsgToMnode)(int8_t *pCont, int32_t contLen, int8_t msgType);
int32_t (*dnodeSendSimpleRspToMnode)(int32_t msgType, int32_t code);
// multilevelStorage
extern int32_t (*dnodeInitStorage)();
extern void (*dnodeCleanupStorage)();
void dnodeCheckDataDirOpenned(const char* dir);
void dnodeProcessMsgFromMgmtImp(SSchedMsg *sched);
void dnodeProcessMsgFromMgmt(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn);
void dnodeLockVnodes();
......
......@@ -19,6 +19,9 @@
extern "C" {
#endif
#include <stdbool.h>
#include <stdint.h>
#define TAOS_CONN_UDPS 0
#define TAOS_CONN_UDPC 1
#define TAOS_CONN_TCPS 2
......
......@@ -38,14 +38,11 @@ int mgmtSendOneFreeVnodeMsg(SVnodeGid *pVnodeGid);
char *taosBuildRspMsgToDnode(SDnodeObj *pObj, char type);
char *taosBuildReqMsgToDnode(SDnodeObj *pObj, char type);
extern char* (*taosBuildRspMsgToDnodeWithSize)(SDnodeObj *pObj, char type, int32_t size);
extern char* (*taosBuildReqMsgToDnodeWithSize)(SDnodeObj *pObj, char type, int32_t size);
extern int32_t (*taosSendSimpleRspToDnode)(SDnodeObj *pObj, char rsptype, char code);
extern int32_t (*taosSendMsgToDnode)(SDnodeObj *pObj, char *msg, int32_t msgLen);
extern int32_t (*mgmtSendSimpleRspToDnode)(int32_t msgType, int32_t code);
extern int32_t (*mgmtSendMsgToDnode)(SDnodeObj *pObj, char *msg, int msgLen);
extern int32_t (*mgmtInitDnodeInt)();
extern void (*mgmtCleanUpDnodeInt)();
extern void (*mgmtProcessDnodeStatus)(void *handle, void *tmrId);
extern void (*mgmtProcessMsgFromDnodeSpec)(SSchedMsg *sched);
#ifdef __cplusplus
......
......@@ -50,7 +50,7 @@ int mgmtProcessMeterCfgMsg(char *cont, int contLen, SDnodeObj *pObj) {
SVgObj * pVgroup;
if (!sdbMaster) {
taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_TABLE_CFG_RSP, TSDB_CODE_REDIRECT);
mgmtSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_TABLE_CFG_RSP, TSDB_CODE_REDIRECT);
return 0;
}
......@@ -59,7 +59,7 @@ int mgmtProcessMeterCfgMsg(char *cont, int contLen, SDnodeObj *pObj) {
pStart = taosBuildRspMsgToDnodeWithSize(pObj, TSDB_MSG_TYPE_TABLE_CFG_RSP, 64000);
if (pStart == NULL) {
taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_TABLE_CFG_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
mgmtSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_TABLE_CFG_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
return 0;
}
......@@ -88,7 +88,7 @@ int mgmtProcessMeterCfgMsg(char *cont, int contLen, SDnodeObj *pObj) {
}
msgLen = pMsg - pStart;
taosSendMsgToDnode(pObj, pStart, msgLen);
mgmtSendMsgToDnode(pObj, pStart, msgLen);
return 0;
}
......@@ -100,7 +100,7 @@ int mgmtProcessVpeerCfgMsg(char *cont, int contLen, SDnodeObj *pObj) {
SVgObj * pVgroup = NULL;
if (!sdbMaster) {
taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_VNODE_CFG_RSP, TSDB_CODE_REDIRECT);
mgmtSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_VNODE_CFG_RSP, TSDB_CODE_REDIRECT);
return 0;
}
......@@ -108,7 +108,7 @@ int mgmtProcessVpeerCfgMsg(char *cont, int contLen, SDnodeObj *pObj) {
pStart = taosBuildRspMsgToDnode(pObj, TSDB_MSG_TYPE_VNODE_CFG_RSP);
if (pStart == NULL) {
taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_VNODE_CFG_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
mgmtSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_VNODE_CFG_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
return 0;
}
pMsg = pStart;
......@@ -129,7 +129,7 @@ int mgmtProcessVpeerCfgMsg(char *cont, int contLen, SDnodeObj *pObj) {
}
msgLen = pMsg - pStart;
taosSendMsgToDnode(pObj, pStart, msgLen);
mgmtSendMsgToDnode(pObj, pStart, msgLen);
return 0;
}
......@@ -142,7 +142,7 @@ int mgmtProcessVPeersRsp(char *msg, int msgLen, SDnodeObj *pObj) {
STaosRsp *pRsp = (STaosRsp *)msg;
if (!sdbMaster) {
taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_DNODE_VPEERS_RSP, TSDB_CODE_REDIRECT);
mgmtSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_DNODE_VPEERS_RSP, TSDB_CODE_REDIRECT);
return 0;
}
......@@ -251,7 +251,7 @@ int32_t mgmtSendCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgroup, int
int8_t *pMsg = mgmtBuildCreateChildTableMsg(pTable, pStart, pVgroup->vnodeGid[index].vnode, tagDataLen, pTagData);
int32_t msgLen = pMsg - pStart;
taosSendMsgToDnode(pObj, pStart, msgLen);
mgmtSendMsgToDnode(pObj, pStart, msgLen);
}
pVgroup->lastCreate = timeStamp;
......@@ -275,7 +275,7 @@ int32_t mgmtSendCreateStreamTableMsg(SStreamTableObj *pTable, SVgObj *pVgroup) {
int8_t *pMsg = mgmtBuildCreateStreamTableMsg(pTable, pStart, pVgroup->vnodeGid[index].vnode);
int32_t msgLen = pMsg - pStart;
taosSendMsgToDnode(pObj, pStart, msgLen);
mgmtSendMsgToDnode(pObj, pStart, msgLen);
}
pVgroup->lastCreate = timeStamp;
......@@ -299,7 +299,7 @@ int32_t mgmtSendCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgroup) {
int8_t *pMsg = mgmtBuildCreateNormalTableMsg(pTable, pStart, pVgroup->vnodeGid[index].vnode);
int32_t msgLen = pMsg - pStart;
taosSendMsgToDnode(pObj, pStart, msgLen);
mgmtSendMsgToDnode(pObj, pStart, msgLen);
}
pVgroup->lastCreate = timeStamp;
......@@ -334,7 +334,7 @@ int mgmtSendRemoveMeterMsgToDnode(STabObj *pTable, SVgObj *pVgroup) {
pMsg += sizeof(SDRemoveTableMsg);
msgLen = pMsg - pStart;
taosSendMsgToDnode(pObj, pStart, msgLen);
mgmtSendMsgToDnode(pObj, pStart, msgLen);
tinet_ntoa(ipstr, pVgroup->vnodeGid[i].ip);
mTrace("dnode:%s vid:%d, send remove meter msg, sid:%d status:%d", ipstr, pVgroup->vnodeGid[i].vnode,
......@@ -371,7 +371,7 @@ int mgmtSendAlterStreamMsgToDnode(STabObj *pTable, SVgObj *pVgroup) {
pMsg += sizeof(SAlterStreamMsg);
msgLen = pMsg - pStart;
taosSendMsgToDnode(pObj, pStart, msgLen);
mgmtSendMsgToDnode(pObj, pStart, msgLen);
}
return 0;
......@@ -433,7 +433,7 @@ int mgmtSendVPeersMsg(SVgObj *pVgroup) {
pMsg = mgmtBuildVpeersIe(pStart, pVgroup, pVgroup->vnodeGid[i].vnode);
msgLen = pMsg - pStart;
taosSendMsgToDnode(pDnode, pStart, msgLen);
mgmtSendMsgToDnode(pDnode, pStart, msgLen);
}
}
......@@ -467,7 +467,7 @@ int mgmtSendOneFreeVnodeMsg(SVnodeGid *pVnodeGid) {
pMsg += sizeof(SFreeVnodeMsg);
msgLen = pMsg - pStart;
taosSendMsgToDnode(pDnode, pStart, msgLen);
mgmtSendMsgToDnode(pDnode, pStart, msgLen);
return 0;
}
......@@ -547,7 +547,7 @@ int mgmtSendCfgDnodeMsg(char *cont) {
pMsg += sizeof(SCfgMsg);
msgLen = pMsg - pStart;
taosSendMsgToDnode(pDnode, pStart, msgLen);
mgmtSendMsgToDnode(pDnode, pStart, msgLen);
#else
(void)tsCfgDynamicOptions(pCfg->config);
#endif
......@@ -564,56 +564,41 @@ void * mgmtStatusTimer = NULL;
void mgmtProcessMsgFromDnode(char *content, int msgLen, int msgType, SDnodeObj *pObj);
char* taosBuildRspMsgToDnodeWithSizeImp(SDnodeObj *pObj, char type, int32_t size) {
char *pStart = (char *)malloc(size);
if (pStart == NULL) {
return NULL;
}
void mgmtSendMsgToDnodeImpFp(SSchedMsg *sched) {
int8_t msgType = *(sched->msg - 1);
int8_t *pCont = sched->msg;
int32_t contLen = (int32_t) sched->ahandle;
void *pConn = NULL;
*pStart = type;
return pStart + 1;
dnodeProcessMsgFromMgmt(pCont, contLen, msgType, pConn);
rpcFreeCont(sched->msg);
}
char* (*taosBuildRspMsgToDnodeWithSize)(SDnodeObj *pObj, char type, int32_t size) = taosBuildRspMsgToDnodeWithSizeImp;
char* taosBuildReqMsgToDnodeWithSizeImp(SDnodeObj *pObj, char type, int32_t size) {
char *pStart = (char *)malloc(size);
if (pStart == NULL) {
return NULL;
}
*pStart = type;
return pStart + 1;
}
char* (*taosBuildReqMsgToDnodeWithSize)(SDnodeObj *pObj, char type, int32_t size) = taosBuildReqMsgToDnodeWithSizeImp;
int32_t mgmtSendMsgToDnodeImp(int8_t *pCont, int32_t contLen, int8_t msgType) {
mTrace("msg:%s is sent to dnode", taosMsg[msgType]);
*(pCont-1) = msgType;
char *taosBuildRspMsgToDnode(SDnodeObj *pObj, char type) {
return taosBuildRspMsgToDnodeWithSize(pObj, type, 256);
}
SSchedMsg schedMsg;
schedMsg.fp = mgmtSendMsgToDnodeImpFp;
schedMsg.msg = pCont;
schedMsg.ahandle = (void*)contLen;
schedMsg.thandle = NULL;
taosScheduleTask(tsDnodeMgmtQhandle, &schedMsg);
char *taosBuildReqMsgToDnode(SDnodeObj *pObj, char type) {
return taosBuildReqMsgToDnodeWithSize(pObj, type, 256);
return TSDB_CODE_SUCCESS;
}
int32_t taosSendSimpleRspToDnodeImp(SDnodeObj *pObj, char rsptype, char code) { return 0; }
int32_t (*taosSendSimpleRspToDnode)(SDnodeObj *pObj, char rsptype, char code) = taosSendSimpleRspToDnodeImp;
int32_t (*mgmtSendMsgToDnode)(SDnodeObj *pObj, char *msg, int msgLen) = mgmtSendMsgToDnodeImp;
int32_t taosSendMsgToDnodeImp(SDnodeObj *pObj, char *msg, int32_t msgLen) {
mTrace("msg:%s is sent to dnode", taosMsg[(uint8_t)(*(msg-1))]);
/*
* Lite version has no message header, so minus one
*/
SSchedMsg schedMsg;
schedMsg.fp = dnodeProcessMsgFromMgmtImp;
schedMsg.msg = msg - 1;
schedMsg.ahandle = NULL;
schedMsg.thandle = NULL;
taosScheduleTask(tsDnodeMgmtQhandle, &schedMsg);
int32_t mgmtSendSimpleRspToDnodeImp(int32_t msgType, int32_t code) {
int8_t *pCont = rpcMallocCont(sizeof(int32_t));
*(int32_t *) pCont = code;
return 0;
mgmtSendMsgToDnodeImp(pCont, sizeof(int32_t), msgType);
return TSDB_CODE_SUCCESS;
}
int32_t (*taosSendMsgToDnode)(SDnodeObj *pObj, char *msg, int msgLen) = taosSendMsgToDnodeImp;
int32_t (*mgmtSendSimpleRspToDnode)(int32_t msgType, int32_t code) = mgmtSendSimpleRspToDnodeImp;
int32_t mgmtInitDnodeIntImp() { return 0; }
int32_t (*mgmtInitDnodeInt)() = mgmtInitDnodeIntImp;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册