diff --git a/src/dnode/inc/dnodeMPeer.h b/src/dnode/inc/dnodeMPeer.h index 0015532f152651a6dd6c87688cb62dbfdc07aa2b..cdbb4a210c959698b7e25aaaa7385b33c2556a08 100644 --- a/src/dnode/inc/dnodeMPeer.h +++ b/src/dnode/inc/dnodeMPeer.h @@ -22,6 +22,8 @@ extern "C" { int32_t dnodeInitMnodePeer(); void dnodeCleanupMnodePeer(); +int32_t dnodeAllocateMnodePqueue(); +void dnodeFreeMnodePqueue(); void dnodeDispatchToMnodePeerQueue(SRpcMsg *pMsg); #ifdef __cplusplus diff --git a/src/dnode/inc/dnodeMRead.h b/src/dnode/inc/dnodeMRead.h index 0b340a865f10715aaf686d507f5a5f87dd997139..4e93838b7998850e1bec79f017fdfbd28f286a11 100644 --- a/src/dnode/inc/dnodeMRead.h +++ b/src/dnode/inc/dnodeMRead.h @@ -22,6 +22,8 @@ extern "C" { int32_t dnodeInitMnodeRead(); void dnodeCleanupMnodeRead(); +int32_t dnodeAllocateMnodeRqueue(); +void dnodeFreeMnodeRqueue(); void dnodeDispatchToMnodeReadQueue(SRpcMsg *rpcMsg); #ifdef __cplusplus diff --git a/src/dnode/inc/dnodeMWrite.h b/src/dnode/inc/dnodeMWrite.h index 7a3ec93446d5718c9194c92e0eea26876ab386c6..498fea81c59329b4d30874d36d10182b6e3ae54f 100644 --- a/src/dnode/inc/dnodeMWrite.h +++ b/src/dnode/inc/dnodeMWrite.h @@ -22,6 +22,8 @@ extern "C" { int32_t dnodeInitMnodeWrite(); void dnodeCleanupMnodeWrite(); +int32_t dnodeAllocateMnodeWqueue(); +void dnodeFreeMnodeWqueue(); void dnodeDispatchToMnodeWriteQueue(SRpcMsg *pMsg); #ifdef __cplusplus diff --git a/src/dnode/src/dnodePeer.c b/src/dnode/src/dnodePeer.c index 53e664b58b62890cb85d0c1dca5aac19ffd9b9b1..bde6cd2aa74b0cd3a234fb339ee43665c4587683 100644 --- a/src/dnode/src/dnodePeer.c +++ b/src/dnode/src/dnodePeer.c @@ -28,8 +28,7 @@ #include "dnodeInt.h" #include "dnodeMgmt.h" #include "dnodeVWrite.h" -#include "dnodeMRead.h" -#include "dnodeMWrite.h" +#include "dnodeMPeer.h" extern void dnodeUpdateMnodeIpSetForPeer(SRpcIpSet *pIpSet); static void (*dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); @@ -50,11 +49,11 @@ int32_t dnodeInitServer() { dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeDispatchToDnodeMgmt; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToDnodeMgmt; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = dnodeDispatchToMnodeReadQueue; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = dnodeDispatchToMnodeReadQueue; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_AUTH] = dnodeDispatchToMnodeReadQueue; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_GRANT] = dnodeDispatchToMnodeWriteQueue; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_STATUS] = dnodeDispatchToMnodeWriteQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = dnodeDispatchToMnodePeerQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = dnodeDispatchToMnodePeerQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_AUTH] = dnodeDispatchToMnodePeerQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_GRANT] = dnodeDispatchToMnodePeerQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_STATUS] = dnodeDispatchToMnodePeerQueue; SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); @@ -103,16 +102,14 @@ static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcIpSet *pIpSet) { rpcSendResponse(&rspMsg); return; } - + if (dnodeProcessReqMsgFp[pMsg->msgType]) { (*dnodeProcessReqMsgFp[pMsg->msgType])(pMsg); } else { + dTrace("RPC %p, message:%s not processed", pMsg->handle, taosMsg[pMsg->msgType]); rspMsg.code = TSDB_CODE_MSG_NOT_PROCESSED; rpcSendResponse(&rspMsg); rpcFreeCont(pMsg->pCont); - dTrace("RPC %p, message:%s not processed", pMsg->handle, taosMsg[pMsg->msgType]); - return; - } } @@ -148,13 +145,14 @@ void dnodeCleanupClient() { } static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcIpSet *pIpSet) { - if (dnodeProcessRspMsgFp[pMsg->msgType]) { - if (pMsg->msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pIpSet) { - dnodeUpdateMnodeIpSetForPeer(pIpSet); - } + if (pMsg->msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pIpSet) { + dnodeUpdateMnodeIpSetForPeer(pIpSet); + } + + if (dnodeProcessRspMsgFp[pMsg->msgType]) { (*dnodeProcessRspMsgFp[pMsg->msgType])(pMsg); } else { - dError("RPC %p, msg:%s is not processed", pMsg->handle, taosMsg[pMsg->msgType]); + mnodeProcessPeerRsp(pMsg); } rpcFreeCont(pMsg->pCont); diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index 68dfaef4084c7be58d60857036db2915647b1113..bf40dd432608ddc0d4be44f678bee23dc199b96e 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -61,9 +61,10 @@ int32_t dnodeInitShell() { dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_QUERY] = dnodeDispatchToMnodeWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_STREAM] = dnodeDispatchToMnodeWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_CONN] = dnodeDispatchToMnodeWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_HEARTBEAT] = dnodeDispatchToMnodeWriteQueue; - + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE]= dnodeDispatchToMnodeWriteQueue; + // the following message shall be treated as mnode query + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_HEARTBEAT] = dnodeDispatchToMnodeReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONNECT] = dnodeDispatchToMnodeReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_USE_DB] = dnodeDispatchToMnodeReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLE_META] = dnodeDispatchToMnodeReadQueue; @@ -71,7 +72,6 @@ int32_t dnodeInitShell() { dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLES_META] = dnodeDispatchToMnodeReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_SHOW] = dnodeDispatchToMnodeReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE] = dnodeDispatchToMnodeReadQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE]= dnodeDispatchToMnodeReadQueue; int32_t numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore; numOfThreads = (int32_t) ((1.0 - tsRatioOfQueryThreads) * numOfThreads / 2.0); diff --git a/src/mnode/inc/mnodePeer.h b/src/mnode/inc/mnodePeer.h index c81617266d7353f27367ad75a93a6890e289410b..e409d90de96c4e6f0e37ca48ce212ec38efb5706 100644 --- a/src/mnode/inc/mnodePeer.h +++ b/src/mnode/inc/mnodePeer.h @@ -24,6 +24,7 @@ extern "C" { void mnodeAddPeerRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)); void mnodeAddPeerMsgHandle(uint8_t msgType, int32_t (*fp)(SMnodeMsg *mnodeMsg)); int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg); +void mnodeProcessPeerRsp(SRpcMsg *pMsg); #ifdef __cplusplus } diff --git a/src/mnode/src/mnodePeer.c b/src/mnode/src/mnodePeer.c index a4bb28b02cef10cd9211a93391dd63a90154e59e..e17c52a0b1ce13266a1d384ed885353e285044fc 100644 --- a/src/mnode/src/mnodePeer.c +++ b/src/mnode/src/mnodePeer.c @@ -81,6 +81,4 @@ void mnodeProcessPeerRsp(SRpcMsg *pMsg) { } else { mError("msg:%s is not processed", pMsg->handle, taosMsg[pMsg->msgType]); } - - rpcFreeCont(pMsg->pCont); }