From d1003fedff1983e5165e931e03ebb994b0b9d68d Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 27 May 2020 10:57:38 +0000 Subject: [PATCH] [TD-335] message put into peer queue --- src/dnode/inc/dnodeMPeer.h | 2 ++ src/dnode/inc/dnodeMRead.h | 2 ++ src/dnode/inc/dnodeMWrite.h | 2 ++ src/dnode/src/dnodePeer.c | 30 ++++++++++++++---------------- src/dnode/src/dnodeShell.c | 6 +++--- src/mnode/inc/mnodePeer.h | 1 + src/mnode/src/mnodePeer.c | 2 -- 7 files changed, 24 insertions(+), 21 deletions(-) diff --git a/src/dnode/inc/dnodeMPeer.h b/src/dnode/inc/dnodeMPeer.h index 0015532f15..cdbb4a210c 100644 --- a/src/dnode/inc/dnodeMPeer.h +++ b/src/dnode/inc/dnodeMPeer.h @@ -22,6 +22,8 @@ extern "C" { int32_t dnodeInitMnodePeer(); void dnodeCleanupMnodePeer(); +int32_t dnodeAllocateMnodePqueue(); +void dnodeFreeMnodePqueue(); void dnodeDispatchToMnodePeerQueue(SRpcMsg *pMsg); #ifdef __cplusplus diff --git a/src/dnode/inc/dnodeMRead.h b/src/dnode/inc/dnodeMRead.h index 0b340a865f..4e93838b79 100644 --- a/src/dnode/inc/dnodeMRead.h +++ b/src/dnode/inc/dnodeMRead.h @@ -22,6 +22,8 @@ extern "C" { int32_t dnodeInitMnodeRead(); void dnodeCleanupMnodeRead(); +int32_t dnodeAllocateMnodeRqueue(); +void dnodeFreeMnodeRqueue(); void dnodeDispatchToMnodeReadQueue(SRpcMsg *rpcMsg); #ifdef __cplusplus diff --git a/src/dnode/inc/dnodeMWrite.h b/src/dnode/inc/dnodeMWrite.h index 7a3ec93446..498fea81c5 100644 --- a/src/dnode/inc/dnodeMWrite.h +++ b/src/dnode/inc/dnodeMWrite.h @@ -22,6 +22,8 @@ extern "C" { int32_t dnodeInitMnodeWrite(); void dnodeCleanupMnodeWrite(); +int32_t dnodeAllocateMnodeWqueue(); +void dnodeFreeMnodeWqueue(); void dnodeDispatchToMnodeWriteQueue(SRpcMsg *pMsg); #ifdef __cplusplus diff --git a/src/dnode/src/dnodePeer.c b/src/dnode/src/dnodePeer.c index 53e664b58b..bde6cd2aa7 100644 --- a/src/dnode/src/dnodePeer.c +++ b/src/dnode/src/dnodePeer.c @@ -28,8 +28,7 @@ #include "dnodeInt.h" #include "dnodeMgmt.h" #include "dnodeVWrite.h" -#include "dnodeMRead.h" -#include "dnodeMWrite.h" +#include "dnodeMPeer.h" extern void dnodeUpdateMnodeIpSetForPeer(SRpcIpSet *pIpSet); static void (*dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); @@ -50,11 +49,11 @@ int32_t dnodeInitServer() { dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeDispatchToDnodeMgmt; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToDnodeMgmt; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = dnodeDispatchToMnodeReadQueue; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = dnodeDispatchToMnodeReadQueue; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_AUTH] = dnodeDispatchToMnodeReadQueue; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_GRANT] = dnodeDispatchToMnodeWriteQueue; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_STATUS] = dnodeDispatchToMnodeWriteQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = dnodeDispatchToMnodePeerQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = dnodeDispatchToMnodePeerQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_AUTH] = dnodeDispatchToMnodePeerQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_GRANT] = dnodeDispatchToMnodePeerQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_STATUS] = dnodeDispatchToMnodePeerQueue; SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); @@ -103,16 +102,14 @@ static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcIpSet *pIpSet) { rpcSendResponse(&rspMsg); return; } - + if (dnodeProcessReqMsgFp[pMsg->msgType]) { (*dnodeProcessReqMsgFp[pMsg->msgType])(pMsg); } else { + dTrace("RPC %p, message:%s not processed", pMsg->handle, taosMsg[pMsg->msgType]); rspMsg.code = TSDB_CODE_MSG_NOT_PROCESSED; rpcSendResponse(&rspMsg); rpcFreeCont(pMsg->pCont); - dTrace("RPC %p, message:%s not processed", pMsg->handle, taosMsg[pMsg->msgType]); - return; - } } @@ -148,13 +145,14 @@ void dnodeCleanupClient() { } static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcIpSet *pIpSet) { - if (dnodeProcessRspMsgFp[pMsg->msgType]) { - if (pMsg->msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pIpSet) { - dnodeUpdateMnodeIpSetForPeer(pIpSet); - } + if (pMsg->msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pIpSet) { + dnodeUpdateMnodeIpSetForPeer(pIpSet); + } + + if (dnodeProcessRspMsgFp[pMsg->msgType]) { (*dnodeProcessRspMsgFp[pMsg->msgType])(pMsg); } else { - dError("RPC %p, msg:%s is not processed", pMsg->handle, taosMsg[pMsg->msgType]); + mnodeProcessPeerRsp(pMsg); } rpcFreeCont(pMsg->pCont); diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index 68dfaef408..bf40dd4326 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -61,9 +61,10 @@ int32_t dnodeInitShell() { dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_QUERY] = dnodeDispatchToMnodeWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_STREAM] = dnodeDispatchToMnodeWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_CONN] = dnodeDispatchToMnodeWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_HEARTBEAT] = dnodeDispatchToMnodeWriteQueue; - + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE]= dnodeDispatchToMnodeWriteQueue; + // the following message shall be treated as mnode query + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_HEARTBEAT] = dnodeDispatchToMnodeReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONNECT] = dnodeDispatchToMnodeReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_USE_DB] = dnodeDispatchToMnodeReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLE_META] = dnodeDispatchToMnodeReadQueue; @@ -71,7 +72,6 @@ int32_t dnodeInitShell() { dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLES_META] = dnodeDispatchToMnodeReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_SHOW] = dnodeDispatchToMnodeReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE] = dnodeDispatchToMnodeReadQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE]= dnodeDispatchToMnodeReadQueue; int32_t numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore; numOfThreads = (int32_t) ((1.0 - tsRatioOfQueryThreads) * numOfThreads / 2.0); diff --git a/src/mnode/inc/mnodePeer.h b/src/mnode/inc/mnodePeer.h index c81617266d..e409d90de9 100644 --- a/src/mnode/inc/mnodePeer.h +++ b/src/mnode/inc/mnodePeer.h @@ -24,6 +24,7 @@ extern "C" { void mnodeAddPeerRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)); void mnodeAddPeerMsgHandle(uint8_t msgType, int32_t (*fp)(SMnodeMsg *mnodeMsg)); int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg); +void mnodeProcessPeerRsp(SRpcMsg *pMsg); #ifdef __cplusplus } diff --git a/src/mnode/src/mnodePeer.c b/src/mnode/src/mnodePeer.c index a4bb28b02c..e17c52a0b1 100644 --- a/src/mnode/src/mnodePeer.c +++ b/src/mnode/src/mnodePeer.c @@ -81,6 +81,4 @@ void mnodeProcessPeerRsp(SRpcMsg *pMsg) { } else { mError("msg:%s is not processed", pMsg->handle, taosMsg[pMsg->msgType]); } - - rpcFreeCont(pMsg->pCont); } -- GitLab