提交 9f3ef6cd 编写于 作者: S Shengliang Guan

adjust rpc code after change msgtype from int8 to int16

上级 4c727870
......@@ -48,6 +48,7 @@ extern int tMsgDict[];
#define TMSG_SEG_CODE(TYPE) (((TYPE)&0xff00) >> 8)
#define TMSG_SEG_SEQ(TYPE) ((TYPE)&0xff)
#define TMSG_INFO(TYPE) tMsgInfo[tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)]
#define TMSG_INDEX(TYPE) (tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE))
typedef uint16_t tmsg_t;
......
......@@ -63,7 +63,7 @@ void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
}
void dndSendRedirectMsg(SDnode *pDnode, SRpcMsg *pMsg) {
int32_t msgType = pMsg->msgType;
tmsg_t msgType = pMsg->msgType;
SEpSet epSet = {0};
dndGetMnodeEpSet(pDnode, &epSet);
......@@ -369,7 +369,7 @@ void dndSendStatusMsg(SDnode *pDnode) {
dndGetVnodeLoads(pDnode, &pStatus->vnodeLoads);
contLen = sizeof(SStatusMsg) + pStatus->vnodeLoads.num * sizeof(SVnodeLoad);
SRpcMsg rpcMsg = {.pCont = pStatus, .contLen = contLen, .msgType = TDMT_MND_STATUS};
SRpcMsg rpcMsg = {.pCont = pStatus, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = 9527};
pMgmt->statusSent = 1;
dTrace("pDnode:%p, send status msg to mnode", pDnode);
......
......@@ -31,104 +31,104 @@
static void dndInitMsgFp(STransMgmt *pMgmt) {
// msg from client to dnode
pMgmt->msgFp[TDMT_VND_SUBMIT] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TDMT_VND_QUERY] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TDMT_VND_FETCH] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TDMT_MND_CREATE_TABLE] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TDMT_MND_DROP_TABLE] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TDMT_VND_ALTER_TABLE] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TDMT_VND_UPDATE_TAG_VAL] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TDMT_VND_TABLE_META] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TDMT_VND_TABLES_META] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TDMT_VND_MQ_QUERY] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TDMT_VND_MQ_CONSUME] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TDMT_VND_MQ_CONNECT] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TDMT_VND_MQ_DISCONNECT] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TDMT_VND_MQ_SET_CUR] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TDMT_VND_RES_READY] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TDMT_VND_TASKS_STATUS] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TDMT_VND_CANCEL_TASK] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TDMT_VND_DROP_TASK] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBMIT)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY)] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_FETCH)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_TABLE)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_TABLE)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_TABLE)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_UPDATE_TAG_VAL)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TABLE_META)] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TABLES_META)] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_QUERY)] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_CONSUME)] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_CONNECT)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_DISCONNECT)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CUR)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_RES_READY)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TASKS_STATUS)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CANCEL_TASK)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_TASK)] = dndProcessVnodeFetchMsg;
// msg from client to mnode
pMgmt->msgFp[TDMT_MND_CONNECT] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TDMT_MND_CREATE_ACCT] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TDMT_MND_ALTER_ACCT] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TDMT_MND_DROP_ACCT] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TDMT_MND_CREATE_USER] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TDMT_MND_ALTER_USER] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TDMT_MND_DROP_USER] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TDMT_MND_CREATE_DNODE] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TDMT_MND_CONFIG_DNODE] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TDMT_MND_DROP_DNODE] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TDMT_MND_CREATE_MNODE] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TDMT_MND_DROP_MNODE] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TDMT_MND_CREATE_DB] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TDMT_MND_DROP_DB] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TDMT_MND_USE_DB] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TDMT_MND_ALTER_DB] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TDMT_MND_SYNC_DB] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TDMT_MND_COMPACT_DB] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TDMT_MND_CREATE_FUNCTION] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TDMT_MND_RETRIEVE_FUNCTION] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TDMT_MND_DROP_FUNCTION] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TDMT_MND_CREATE_STB] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TDMT_MND_ALTER_STB] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TDMT_MND_DROP_STB] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TDMT_MND_VGROUP_LIST] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TDMT_MND_KILL_QUERY] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TDMT_MND_KILL_CONN] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TDMT_MND_HEARTBEAT] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TDMT_MND_SHOW] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TDMT_MND_SHOW_RETRIEVE] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CONNECT)] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_ACCT)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_ACCT)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_ACCT)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_USER)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_USER)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_USER)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_DNODE)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CONFIG_DNODE)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_DNODE)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_MNODE)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_MNODE)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_DB)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_DB)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_USE_DB)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_DB)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SYNC_DB)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_COMPACT_DB)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_FUNCTION)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNCTION)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_FUNCTION)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_STB)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_STB)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_STB)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_VGROUP_LIST)] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_QUERY)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_CONN)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_HEARTBEAT)] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW)] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = dndProcessMnodeReadMsg;
// message from client to dnode
pMgmt->msgFp[TDMT_DND_NETWORK_TEST] = dndProcessDnodeReq;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_NETWORK_TEST)] = dndProcessDnodeReq;
// message from mnode to vnode
pMgmt->msgFp[TDMT_VND_CREATE_STB] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TDMT_VND_CREATE_STB_RSP] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TDMT_VND_ALTER_STB] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TDMT_VND_ALTER_STB_RSP] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TDMT_VND_DROP_STB] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TDMT_VND_DROP_STB_RSP] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CREATE_STB)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CREATE_STB_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_STB)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_STB_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_STB)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_STB_RSP)] = dndProcessMnodeWriteMsg;
// message from mnode to dnode
pMgmt->msgFp[TDMT_DND_CREATE_VNODE] = dndProcessVnodeMgmtMsg;
pMgmt->msgFp[TDMT_DND_CREATE_VNODE_RSP] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TDMT_DND_ALTER_VNODE] = dndProcessVnodeMgmtMsg;
pMgmt->msgFp[TDMT_DND_ALTER_VNODE_RSP] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TDMT_DND_DROP_VNODE] = dndProcessVnodeMgmtMsg;
pMgmt->msgFp[TDMT_DND_DROP_VNODE_RSP] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TDMT_DND_SYNC_VNODE] = dndProcessVnodeMgmtMsg;
pMgmt->msgFp[TDMT_DND_SYNC_VNODE_RSP] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TDMT_DND_AUTH_VNODE] = dndProcessVnodeMgmtMsg;
pMgmt->msgFp[TDMT_DND_AUTH_VNODE_RSP] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TDMT_DND_COMPACT_VNODE] = dndProcessVnodeMgmtMsg;
pMgmt->msgFp[TDMT_DND_COMPACT_VNODE_RSP] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TDMT_DND_CREATE_MNODE] = dndProcessMnodeMgmtMsg;
pMgmt->msgFp[TDMT_DND_CREATE_MNODE_RSP] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TDMT_DND_ALTER_MNODE] = dndProcessMnodeMgmtMsg;
pMgmt->msgFp[TDMT_DND_ALTER_MNODE_RSP] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TDMT_DND_DROP_MNODE] = dndProcessMnodeMgmtMsg;
pMgmt->msgFp[TDMT_DND_DROP_MNODE_RSP] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TDMT_DND_CONFIG_DNODE] = dndProcessDnodeReq;
pMgmt->msgFp[TDMT_DND_CONFIG_DNODE_RSP] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_VNODE)] = dndProcessVnodeMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_VNODE_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_VNODE)] = dndProcessVnodeMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_VNODE_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_VNODE)] = dndProcessVnodeMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_VNODE_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_SYNC_VNODE)] = dndProcessVnodeMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_SYNC_VNODE_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_AUTH_VNODE)] = dndProcessVnodeMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_AUTH_VNODE_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_COMPACT_VNODE)] = dndProcessVnodeMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_COMPACT_VNODE_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE)] = dndProcessMnodeMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_MNODE)] = dndProcessMnodeMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_MNODE_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_MNODE)] = dndProcessMnodeMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_MNODE_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CONFIG_DNODE)] = dndProcessDnodeReq;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CONFIG_DNODE_RSP)] = dndProcessMnodeWriteMsg;
// message from dnode to mnode
pMgmt->msgFp[TDMT_MND_GRANT] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TDMT_MND_GRANT_RSP] = dndProcessDnodeRsp;
pMgmt->msgFp[TDMT_MND_STATUS] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TDMT_MND_STATUS_RSP] = dndProcessDnodeRsp;
pMgmt->msgFp[TDMT_MND_AUTH] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TDMT_MND_AUTH_RSP] = dndProcessDnodeRsp;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT_RSP)] = dndProcessDnodeRsp;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS_RSP)] = dndProcessDnodeRsp;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH)] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH_RSP)] = dndProcessDnodeRsp;
}
static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SDnode *pDnode = parent;
STransMgmt *pMgmt = &pDnode->tmgmt;
int32_t msgType = pMsg->msgType;
tmsg_t msgType = pMsg->msgType;
if (dndGetStat(pDnode) == DND_STAT_STOPPED) {
if (pMsg == NULL || pMsg->pCont == NULL) return;
......@@ -137,7 +137,7 @@ static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
return;
}
DndMsgFp fp = pMgmt->msgFp[msgType];
DndMsgFp fp = pMgmt->msgFp[TMSG_INDEX(msgType)];
if (fp != NULL) {
(*fp)(pDnode, pMsg, pEpSet);
dTrace("RPC %p, rsp:%s is processed, code:0x%x", pMsg->handle, TMSG_INFO(msgType), pMsg->code & 0XFFFF);
......@@ -186,7 +186,7 @@ static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) {
SDnode *pDnode = param;
STransMgmt *pMgmt = &pDnode->tmgmt;
int32_t msgType = pMsg->msgType;
tmsg_t msgType = pMsg->msgType;
if (msgType == TDMT_DND_NETWORK_TEST) {
dTrace("RPC %p, network test req, app:%p will be processed, code:0x%x", pMsg->handle, pMsg->ahandle, pMsg->code);
dndProcessDnodeReq(pDnode, pMsg, pEpSet);
......@@ -214,7 +214,7 @@ static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) {
return;
}
DndMsgFp fp = pMgmt->msgFp[msgType];
DndMsgFp fp = pMgmt->msgFp[TMSG_INDEX(msgType)];
if (fp != NULL) {
dTrace("RPC %p, req:%s app:%p will be processed", pMsg->handle, TMSG_INFO(msgType), pMsg->ahandle);
(*fp)(pDnode, pMsg, pEpSet);
......
......@@ -87,7 +87,7 @@ typedef struct SMnode {
void mndSendMsgToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *rpcMsg);
void mndSendMsgToMnode(SMnode *pMnode, SRpcMsg *pMsg);
void mndSendRedirectMsg(SMnode *pMnode, SRpcMsg *pMsg);
void mndSetMsgHandle(SMnode *pMnode, int32_t msgType, MndMsgFp fp);
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp);
uint64_t mndGenerateUid(char *name, int32_t len) ;
......
......@@ -24,7 +24,7 @@ extern "C" {
typedef struct {
SEpSet epSet;
int8_t msgType;
tmsg_t msgType;
int8_t msgSent;
int8_t msgReceived;
int32_t errCode;
......
......@@ -132,7 +132,7 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
for (int32_t i = 0; i < redoActionNum; ++i) {
STransAction *pAction = taosArrayGet(pTrans->redoActions, i);
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet));
SDB_SET_INT8(pRaw, dataPos, pAction->msgType)
SDB_SET_INT16(pRaw, dataPos, pAction->msgType)
SDB_SET_INT32(pRaw, dataPos, pAction->contLen)
SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen);
}
......@@ -140,7 +140,7 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
for (int32_t i = 0; i < undoActionNum; ++i) {
STransAction *pAction = taosArrayGet(pTrans->undoActions, i);
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet));
SDB_SET_INT8(pRaw, dataPos, pAction->msgType)
SDB_SET_INT16(pRaw, dataPos, pAction->msgType)
SDB_SET_INT32(pRaw, dataPos, pAction->contLen)
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pCont, pAction->contLen);
}
......@@ -243,7 +243,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
for (int32_t i = 0; i < redoActionNum; ++i) {
STransAction action = {0};
SDB_GET_BINARY(pRaw, pRow, dataPos, (void *)&action.epSet, sizeof(SEpSet));
SDB_GET_INT8(pRaw, pRow, dataPos, &action.msgType)
SDB_GET_INT16(pRaw, pRow, dataPos, &action.msgType)
SDB_GET_INT32(pRaw, pRow, dataPos, &action.contLen)
action.pCont = malloc(action.contLen);
if (action.pCont == NULL) {
......@@ -262,7 +262,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
for (int32_t i = 0; i < undoActionNum; ++i) {
STransAction action = {0};
SDB_GET_BINARY(pRaw, pRow, dataPos, (void *)&action.epSet, sizeof(SEpSet));
SDB_GET_INT8(pRaw, pRow, dataPos, &action.msgType)
SDB_GET_INT16(pRaw, pRow, dataPos, &action.msgType)
SDB_GET_INT32(pRaw, pRow, dataPos, &action.contLen)
action.pCont = malloc(action.contLen);
if (action.pCont == NULL) {
......
......@@ -374,7 +374,7 @@ void mndSendRsp(SMnodeMsg *pMsg, int32_t code) {
static void mndProcessRpcMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
int32_t code = 0;
int32_t msgType = pMsg->rpcMsg.msgType;
tmsg_t msgType = pMsg->rpcMsg.msgType;
void *ahandle = pMsg->rpcMsg.ahandle;
bool isReq = (msgType & 1U);
......@@ -392,10 +392,10 @@ static void mndProcessRpcMsg(SMnodeMsg *pMsg) {
goto PROCESS_RPC_END;
}
MndMsgFp fp = pMnode->msgFp[msgType];
MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(msgType)];
if (fp == NULL) {
code = TSDB_CODE_MSG_NOT_PROCESSED;
mError("msg:%p, app:%p failed to process since not handle", pMsg, ahandle);
mError("msg:%p, app:%p failed to process since no handle", pMsg, ahandle);
goto PROCESS_RPC_END;
}
......@@ -425,9 +425,10 @@ PROCESS_RPC_END:
}
}
void mndSetMsgHandle(SMnode *pMnode, int32_t msgType, MndMsgFp fp) {
if (msgType >= 0 && msgType < TDMT_MAX) {
pMnode->msgFp[msgType] = fp;
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) {
tmsg_t type = TMSG_INDEX(msgType);
if (type >= 0 && type < TDMT_MAX) {
pMnode->msgFp[type] = fp;
}
}
......
......@@ -52,7 +52,7 @@ typedef struct {
char user[TSDB_UNI_LEN]; // user ID
uint16_t port; // for UDP only, port may be changed
char empty[1]; // reserved
uint8_t msgType; // message type
uint16_t msgType; // message type
int32_t msgLen; // message length including the header iteslf
uint32_t msgVer;
int32_t code; // code in response message
......
......@@ -74,7 +74,7 @@ typedef struct {
SEpSet epSet; // ip list provided by app
void *ahandle; // handle provided by app
struct SRpcConn *pConn; // pConn allocated
char msgType; // message type
tmsg_t msgType; // message type
uint8_t *pCont; // content provided by app
int32_t contLen; // content length
int32_t code; // error code
......@@ -108,8 +108,8 @@ typedef struct SRpcConn {
uint16_t tranId; // outgoing transcation ID, for build message
uint16_t outTranId; // outgoing transcation ID
uint16_t inTranId; // transcation ID for incoming msg
uint8_t outType; // message type for outgoing request
uint8_t inType; // message type for incoming request
tmsg_t outType; // message type for outgoing request
tmsg_t inType; // message type for incoming request
void *chandle; // handle passed by TCP/UDP connection layer
void *ahandle; // handle provided by upper app layter
int retry; // number of retry for sending request
......@@ -409,7 +409,7 @@ void rpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t
// connection type is application specific.
// for TDengine, all the query, show commands shall have TCP connection
char type = pMsg->msgType;
tmsg_t type = pMsg->msgType;
if (type == TDMT_VND_QUERY || type == TDMT_MND_SHOW_RETRIEVE
|| type == TDMT_VND_FETCH || type == TDMT_MND_VGROUP_LIST
|| type == TDMT_VND_TABLES_META || type == TDMT_VND_TABLE_META
......@@ -957,7 +957,7 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont
sid = htonl(pHead->destId);
*ppContext = NULL;
if (pHead->msgType >= TDMT_MAX || pHead->msgType <= 0) {
if (TMSG_INDEX(pHead->msgType) >= TDMT_MAX || TMSG_INDEX(pHead->msgType) <= 0) {
tDebug("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHead->msgType);
terrno = TSDB_CODE_RPC_INVALID_MSG_TYPE; return NULL;
}
......@@ -1094,7 +1094,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
SRpcReqContext *pContext;
pConn = rpcProcessMsgHead(pRpc, pRecv, &pContext);
if (pHead->msgType >= 1 && pHead->msgType < TDMT_MAX) {
if (TMSG_INDEX(pHead->msgType) >= 1 && TMSG_INDEX(pHead->msgType) < TDMT_MAX) {
tDebug("%s %p %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label,
pConn, (void *)pHead->ahandle, TMSG_INFO(pHead->msgType), pRecv->ip, pRecv->port, terrno, pRecv->msgLen,
pHead->sourceId, pHead->destId, pHead->tranId, pHead->code);
......@@ -1112,11 +1112,11 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE) {
rpcCloseConn(pConn);
}
if (pHead->msgType + 1 > 1 && pHead->msgType+1 < TDMT_MAX) {
if (TMSG_INDEX(pHead->msgType) + 1 > 1 && TMSG_INDEX(pHead->msgType) + 1 < TDMT_MAX) {
tDebug("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, TMSG_INFO(pHead->msgType+1), code);
} else {
tError("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, TMSG_INFO(pHead->msgType), code);
}
}
}
} else { // msg is passed to app only parsing is ok
rpcProcessIncomingMsg(pConn, pHead, pContext);
......@@ -1262,7 +1262,7 @@ static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) {
memset(msg, 0, sizeof(SRpcHead));
pReplyHead->version = pRecvHead->version;
pReplyHead->msgType = (char)(pRecvHead->msgType + 1);
pReplyHead->msgType = (tmsg_t)(pRecvHead->msgType + 1);
pReplyHead->spi = 0;
pReplyHead->encrypt = pRecvHead->encrypt;
pReplyHead->tranId = pRecvHead->tranId;
......@@ -1292,7 +1292,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
SRpcHead *pHead = rpcHeadFromCont(pContext->pCont);
char *msg = (char *)pHead;
int msgLen = rpcMsgLenFromCont(pContext->contLen);
char msgType = pContext->msgType;
tmsg_t msgType = pContext->msgType;
pContext->numOfTry++;
SRpcConn *pConn = rpcSetupConnToServer(pContext);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册