From 4b7baac9cfdd5b118d6be3726ad3b6c6ac180496 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 16 Mar 2022 14:45:45 +0800 Subject: [PATCH] shm --- source/dnode/mgmt/container/inc/dndInt.h | 2 +- source/dnode/mgmt/dnode/inc/dmWorker.h | 2 +- source/dnode/mgmt/dnode/src/dmMsg.c | 38 +++---- source/dnode/mgmt/dnode/src/dmWorker.c | 4 +- source/dnode/mgmt/mnode/inc/mmWorker.h | 6 +- source/dnode/mgmt/mnode/src/mmMsg.c | 136 +++++++++++------------ source/dnode/mgmt/mnode/src/mmWorker.c | 9 +- source/dnode/mgmt/vnode/inc/vmWorker.h | 8 +- source/dnode/mgmt/vnode/src/vmMsg.c | 74 ++++++------ source/dnode/mgmt/vnode/src/vmWorker.c | 90 +++++++-------- 10 files changed, 176 insertions(+), 193 deletions(-) diff --git a/source/dnode/mgmt/container/inc/dndInt.h b/source/dnode/mgmt/container/inc/dndInt.h index e219a82764..cb66823c5e 100644 --- a/source/dnode/mgmt/container/inc/dndInt.h +++ b/source/dnode/mgmt/container/inc/dndInt.h @@ -72,7 +72,7 @@ typedef struct SQnodeMgmt SQnodeMgmt; typedef struct SSnodeMgmt SSnodeMgmt; typedef struct SBnodeMgmt SBnodeMgmt; -typedef int32_t (*NodeMsgFp)(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +typedef int32_t (*NodeMsgFp)(void *pMgmt, SNodeMsg *pMsg); typedef int32_t (*OpenNodeFp)(SMgmtWrapper *pWrapper); typedef void (*CloseNodeFp)(SMgmtWrapper *pWrapper); typedef bool (*RequireNodeFp)(SMgmtWrapper *pWrapper); diff --git a/source/dnode/mgmt/dnode/inc/dmWorker.h b/source/dnode/mgmt/dnode/inc/dmWorker.h index 611a5e7c67..bdd994dfaf 100644 --- a/source/dnode/mgmt/dnode/inc/dmWorker.h +++ b/source/dnode/mgmt/dnode/inc/dmWorker.h @@ -24,7 +24,7 @@ extern "C" { int32_t dmStartWorker(SDnodeMgmt *pMgmt); void dmStopWorker(SDnodeMgmt *pMgmt); -int32_t dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t dmProcessMgmtMsg(SDnodeMgmt *pMgmt, SNodeMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/dnode/src/dmMsg.c b/source/dnode/mgmt/dnode/src/dmMsg.c index d2541f74e2..931c0eb5a4 100644 --- a/source/dnode/mgmt/dnode/src/dmMsg.c +++ b/source/dnode/mgmt/dnode/src/dmMsg.c @@ -104,25 +104,25 @@ int32_t dmProcessConfigReq(SDnode *pDnode, SRpcMsg *pReq) { void dmInitMsgHandles(SMgmtWrapper *pWrapper) { // Requests handled by DNODE - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE, dmProcessMgmtMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE, dmProcessMgmtMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE, dmProcessMgmtMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE, dmProcessMgmtMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE, dmProcessMgmtMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE, dmProcessMgmtMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE, dmProcessMgmtMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE, dmProcessMgmtMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE, dmProcessMgmtMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, dmProcessMgmtMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, dmProcessMgmtMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, dmProcessMgmtMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE, dmProcessMgmtMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE, dmProcessMgmtMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE, dmProcessMgmtMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_NETWORK_TEST, dmProcessMgmtMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE, (NodeMsgFp)dmProcessMgmtMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE, (NodeMsgFp)dmProcessMgmtMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE, (NodeMsgFp)dmProcessMgmtMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE, (NodeMsgFp)dmProcessMgmtMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE, (NodeMsgFp)dmProcessMgmtMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE, (NodeMsgFp)dmProcessMgmtMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE, (NodeMsgFp)dmProcessMgmtMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE, (NodeMsgFp)dmProcessMgmtMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE, (NodeMsgFp)dmProcessMgmtMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, (NodeMsgFp)dmProcessMgmtMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, (NodeMsgFp)dmProcessMgmtMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, (NodeMsgFp)dmProcessMgmtMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE, (NodeMsgFp)dmProcessMgmtMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE, (NodeMsgFp)dmProcessMgmtMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE, (NodeMsgFp)dmProcessMgmtMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_NETWORK_TEST, (NodeMsgFp)dmProcessMgmtMsg); // Requests handled by MNODE - dndSetMsgHandle(pWrapper, TDMT_MND_STATUS_RSP, dmProcessMgmtMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_GRANT_RSP, dmProcessMgmtMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_AUTH_RSP, dmProcessMgmtMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_STATUS_RSP, (NodeMsgFp)dmProcessMgmtMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_GRANT_RSP, (NodeMsgFp)dmProcessMgmtMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_AUTH_RSP, (NodeMsgFp)dmProcessMgmtMsg); } diff --git a/source/dnode/mgmt/dnode/src/dmWorker.c b/source/dnode/mgmt/dnode/src/dmWorker.c index e3c32225d5..d39d5fb15b 100644 --- a/source/dnode/mgmt/dnode/src/dmWorker.c +++ b/source/dnode/mgmt/dnode/src/dmWorker.c @@ -151,9 +151,7 @@ void dmStopWorker(SDnodeMgmt *pMgmt) { } } -int32_t dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { - SDnodeMgmt *pMgmt = pWrapper->pMgmt; - +int32_t dmProcessMgmtMsg(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { SDnodeWorker *pWorker = &pMgmt->mgmtWorker; if (pMsg->rpcMsg.msgType == TDMT_MND_STATUS_RSP) { pWorker = &pMgmt->statusWorker; diff --git a/source/dnode/mgmt/mnode/inc/mmWorker.h b/source/dnode/mgmt/mnode/inc/mmWorker.h index f86b484699..45a737b9de 100644 --- a/source/dnode/mgmt/mnode/inc/mmWorker.h +++ b/source/dnode/mgmt/mnode/inc/mmWorker.h @@ -24,9 +24,9 @@ extern "C" { int32_t mmStartWorker(SMnodeMgmt *pMgmt); void mmStopWorker(SMnodeMgmt *pMgmt); -int32_t mmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -int32_t mmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -int32_t mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t mmProcessWriteMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t mmProcessSyncMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t mmProcessReadMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t mmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t mmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); diff --git a/source/dnode/mgmt/mnode/src/mmMsg.c b/source/dnode/mgmt/mnode/src/mmMsg.c index bbb49848cc..dccd49fe9d 100644 --- a/source/dnode/mgmt/mnode/src/mmMsg.c +++ b/source/dnode/mgmt/mnode/src/mmMsg.c @@ -122,76 +122,76 @@ int32_t mmProcessDropReq(SDnode *pDnode, SRpcMsg *pReq) { void mmInitMsgHandles(SMgmtWrapper *pWrapper) { // Requests handled by DNODE - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE_RSP, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE_RSP, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE_RSP, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE_RSP, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE_RSP, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE_RSP, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE_RSP, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE_RSP, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE_RSP, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE_RSP, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE_RSP, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE_RSP, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE_RSP, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE_RSP, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE_RSP, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); // Requests handled by MNODE - dndSetMsgHandle(pWrapper, TDMT_MND_CONNECT, mmProcessReadMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_ACCT, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_ACCT, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_ACCT, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_USER, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_USER, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_USER, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_GET_USER_AUTH, mmProcessReadMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_DNODE, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_CONFIG_DNODE, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_DNODE, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_MNODE, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_MNODE, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_QNODE, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_QNODE, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_SNODE, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_SNODE, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_BNODE, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_BNODE, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_DB, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_DB, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_USE_DB, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_DB, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_SYNC_DB, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_COMPACT_DB, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_FUNC, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_RETRIEVE_FUNC, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_FUNC, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STB, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_STB, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_STB, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_TABLE_META, mmProcessReadMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_VGROUP_LIST, mmProcessReadMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_KILL_QUERY, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_KILL_CONN, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_HEARTBEAT, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_SHOW, mmProcessReadMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_SHOW_RETRIEVE, mmProcessReadMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_STATUS, mmProcessReadMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_KILL_TRANS, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_GRANT, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_AUTH, mmProcessReadMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_TOPIC, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_TOPIC, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_TOPIC, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_GET_SUB_EP, mmProcessReadMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_CONNECT, (NodeMsgFp)mmProcessReadMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_ACCT, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_ACCT, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_ACCT, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_USER, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_USER, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_USER, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_GET_USER_AUTH, (NodeMsgFp)mmProcessReadMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_DNODE, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_CONFIG_DNODE, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_DNODE, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_MNODE, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_MNODE, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_QNODE, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_QNODE, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_SNODE, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_SNODE, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_BNODE, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_BNODE, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_DB, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_DB, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_USE_DB, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_DB, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_SYNC_DB, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_COMPACT_DB, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_FUNC, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_RETRIEVE_FUNC, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_FUNC, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STB, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_STB, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_STB, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_TABLE_META, (NodeMsgFp)mmProcessReadMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_VGROUP_LIST, (NodeMsgFp)mmProcessReadMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_KILL_QUERY, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_KILL_CONN, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_HEARTBEAT, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_SHOW, (NodeMsgFp)mmProcessReadMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_SHOW_RETRIEVE, (NodeMsgFp)mmProcessReadMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_STATUS, (NodeMsgFp)mmProcessReadMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_KILL_TRANS, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_GRANT, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_AUTH, (NodeMsgFp)mmProcessReadMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_TOPIC, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_TOPIC, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_TOPIC, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_GET_SUB_EP, (NodeMsgFp)mmProcessReadMsg); // Requests handled by VNODE - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB_RSP, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB_RSP, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB_RSP, mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB_RSP, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB_RSP, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB_RSP, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB_RSP, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB_RSP, (NodeMsgFp)mmProcessWriteMsg); } diff --git a/source/dnode/mgmt/mnode/src/mmWorker.c b/source/dnode/mgmt/mnode/src/mmWorker.c index 0c12039b41..52cfc98682 100644 --- a/source/dnode/mgmt/mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mnode/src/mmWorker.c @@ -94,18 +94,15 @@ static int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SDnodeWorker *pWorker, SNodeM return code; } -int32_t mmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { - SMnodeMgmt *pMgmt = pWrapper->pMgmt; +int32_t mmProcessWriteMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { return mmPutMsgToWorker(pMgmt, &pMgmt->writeWorker, pMsg); } -int32_t mmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { - SMnodeMgmt *pMgmt = pWrapper->pMgmt; +int32_t mmProcessSyncMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { return mmPutMsgToWorker(pMgmt, &pMgmt->syncWorker, pMsg); } -int32_t mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { - SMnodeMgmt *pMgmt = pWrapper->pMgmt; +int32_t mmProcessReadMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg); } diff --git a/source/dnode/mgmt/vnode/inc/vmWorker.h b/source/dnode/mgmt/vnode/inc/vmWorker.h index 005615c23e..7278c4c83e 100644 --- a/source/dnode/mgmt/vnode/inc/vmWorker.h +++ b/source/dnode/mgmt/vnode/inc/vmWorker.h @@ -30,10 +30,10 @@ void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode); int32_t vmPutMsgToQueryQueue(SVnodesMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToApplyQueue(SVnodesMgmt *pMgmt, int32_t vgId, SRpcMsg *pMsg); -int32_t vmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -int32_t vmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -int32_t vmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -int32_t vmProcessFetchMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg); +int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg); +int32_t vmProcessQueryMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg); +int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/vnode/src/vmMsg.c b/source/dnode/mgmt/vnode/src/vmMsg.c index 1251fdbc7e..956e55557c 100644 --- a/source/dnode/mgmt/vnode/src/vmMsg.c +++ b/source/dnode/mgmt/vnode/src/vmMsg.c @@ -232,41 +232,41 @@ int32_t vmProcessCompactVnodeReq(SVnodesMgmt *pMgmt, SRpcMsg *pReq) { void vmInitMsgHandles(SMgmtWrapper *pWrapper) { // Requests handled by VNODE - dndSetMsgHandle(pWrapper, TDMT_VND_SUBMIT, vmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, vmProcessQueryMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, vmProcessQueryMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, vmProcessFetchMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, vmProcessFetchMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, vmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_UPDATE_TAG_VAL, vmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_TABLE_META, vmProcessFetchMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_TABLES_META, vmProcessFetchMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONSUME, vmProcessQueryMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_QUERY, vmProcessQueryMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONNECT, vmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_DISCONNECT, vmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, vmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_RES_READY, vmProcessFetchMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, vmProcessFetchMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, vmProcessFetchMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, vmProcessFetchMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB, vmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB, vmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB, vmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_TABLE, vmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, vmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TABLE, vmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, vmProcessFetchMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES_FETCH, vmProcessFetchMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, vmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, vmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, vmProcessFetchMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_CONSUME, vmProcessFetchMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, vmProcessFetchMsg); - - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, vmProcessCreateVnodeReq); - dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, vmProcessAlterVnodeReq); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, vmProcessDropVnodeReq); - dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE, vmProcessSyncVnodeReq); - dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE, vmProcessCompactVnodeReq); + dndSetMsgHandle(pWrapper, TDMT_VND_SUBMIT, (NodeMsgFp)vmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)vmProcessQueryMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, (NodeMsgFp)vmProcessQueryMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, (NodeMsgFp)vmProcessFetchMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, (NodeMsgFp)vmProcessFetchMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, (NodeMsgFp)vmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_UPDATE_TAG_VAL, (NodeMsgFp)vmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_TABLE_META, (NodeMsgFp)vmProcessFetchMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_TABLES_META, (NodeMsgFp)vmProcessFetchMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONSUME, (NodeMsgFp)vmProcessQueryMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_QUERY, (NodeMsgFp)vmProcessQueryMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONNECT, (NodeMsgFp)vmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_DISCONNECT, (NodeMsgFp)vmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_RES_READY, (NodeMsgFp)vmProcessFetchMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, (NodeMsgFp)vmProcessFetchMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, (NodeMsgFp)vmProcessFetchMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, (NodeMsgFp)vmProcessFetchMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB, (NodeMsgFp)vmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB, (NodeMsgFp)vmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB, (NodeMsgFp)vmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_TABLE, (NodeMsgFp)vmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, (NodeMsgFp)vmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TABLE, (NodeMsgFp)vmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, (NodeMsgFp)vmProcessFetchMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES_FETCH, (NodeMsgFp)vmProcessFetchMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, (NodeMsgFp)vmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, (NodeMsgFp)vmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessFetchMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_CONSUME, (NodeMsgFp)vmProcessFetchMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)vmProcessFetchMsg); + + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, (NodeMsgFp)vmProcessCreateVnodeReq); + dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, (NodeMsgFp)vmProcessAlterVnodeReq); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, (NodeMsgFp)vmProcessDropVnodeReq); + dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE, (NodeMsgFp)vmProcessSyncVnodeReq); + dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE, (NodeMsgFp)vmProcessCompactVnodeReq); } diff --git a/source/dnode/mgmt/vnode/src/vmWorker.c b/source/dnode/mgmt/vnode/src/vmWorker.c index 0ae72b6479..3c42d7b094 100644 --- a/source/dnode/mgmt/vnode/src/vmWorker.c +++ b/source/dnode/mgmt/vnode/src/vmWorker.c @@ -80,35 +80,23 @@ static void vmProcessSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOf } } -static int32_t vmWriteMsgToQueue(STaosQueue *pQueue, SRpcMsg *pRpcMsg, bool sendRsp) { - int32_t code = 0; - - if (pQueue == NULL) { - code = TSDB_CODE_MSG_NOT_PROCESSED; - } else { - SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg)); - if (pMsg == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - } else { - *pMsg = *pRpcMsg; - if (taosWriteQitem(pQueue, pMsg) != 0) { - code = TSDB_CODE_OUT_OF_MEMORY; - } - } - } +static int32_t vmWriteMsgToQueue(STaosQueue *pQueue, SNodeMsg *pMsg, bool sendRsp) { + int32_t code = taosWriteQitem(pQueue, pMsg); if (code != TSDB_CODE_SUCCESS && sendRsp) { - if (pRpcMsg->msgType & 1u) { - SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code}; + if (pMsg->rpcMsg.msgType & 1u) { + SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, .code = code}; rpcSendResponse(&rsp); } - rpcFreeCont(pRpcMsg->pCont); + rpcFreeCont(pMsg->rpcMsg.pCont); } return code; } -static SVnodeObj *vmAcquireFromMsg(SVnodesMgmt *pMgmt, SRpcMsg *pMsg) { +static SVnodeObj *vmAcquireFromMsg(SVnodesMgmt *pMgmt, SNodeMsg *pNodeMsg) { + SRpcMsg *pMsg = &pNodeMsg->rpcMsg; + SMsgHead *pHead = pMsg->pCont; pHead->contLen = htonl(pHead->contLen); pHead->vgId = htonl(pHead->vgId); @@ -126,50 +114,50 @@ static SVnodeObj *vmAcquireFromMsg(SVnodesMgmt *pMgmt, SRpcMsg *pMsg) { return pVnode; } -int32_t vmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { -// SVnodeObj *pVnode = vmAcquireFromMsg(pDnode, pMsg); -// if (pVnode != NULL) { -// (void)vmWriteMsgToQueue(pVnode->pWriteQ, pMsg, true); -// vmReleaseVnode(pMgmt, pVnode); -// } -return 0; +int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { + SVnodeObj *pVnode = vmAcquireFromMsg(pMgmt, pMsg); + if (pVnode != NULL) { + (void)vmWriteMsgToQueue(pVnode->pWriteQ, pMsg, true); + vmReleaseVnode(pMgmt, pVnode); + } } -int32_t vmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { -// SVnodeObj *pVnode = vmAcquireFromMsg(pDnode, pMsg); -// if (pVnode != NULL) { -// (void)vmWriteMsgToQueue(pVnode->pSyncQ, pMsg, true); -// vmReleaseVnode(pMgmt, pVnode); -// } -return 0; +int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { + SVnodeObj *pVnode = vmAcquireFromMsg(pMgmt, pMsg); + if (pVnode != NULL) { + (void)vmWriteMsgToQueue(pVnode->pSyncQ, pMsg, true); + vmReleaseVnode(pMgmt, pVnode); + } } -int32_t vmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { -// SVnodeObj *pVnode = vmAcquireFromMsg(pDnode, pMsg); -// if (pVnode != NULL) { -// (void)vmWriteMsgToQueue(pVnode->pQueryQ, pMsg, true); -// vmReleaseVnode(pMgmt, pVnode); -// } -return 0; +int32_t vmProcessQueryMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { + SVnodeObj *pVnode = vmAcquireFromMsg(pMgmt, pMsg); + if (pVnode != NULL) { + (void)vmWriteMsgToQueue(pVnode->pQueryQ, pMsg, true); + vmReleaseVnode(pMgmt, pVnode); + } } -int32_t vmProcessFetchMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg){ -// SVnodeObj *pVnode = vmAcquireFromMsg(pDnode, pMsg); -// if (pVnode != NULL) { -// (void)vmWriteMsgToQueue(pVnode->pFetchQ, pMsg, true); -// vmReleaseVnode(pMgmt, pVnode); -// } -return 0; +int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { + SVnodeObj *pVnode = vmAcquireFromMsg(pMgmt, pMsg); + if (pVnode != NULL) { + (void)vmWriteMsgToQueue(pVnode->pFetchQ, pMsg, true); + vmReleaseVnode(pMgmt, pVnode); + } } -int32_t vmPutMsgToQueryQueue(SVnodesMgmt *pMgmt, SRpcMsg *pMsg) { - SMsgHead *pHead = pMsg->pCont; +int32_t vmPutMsgToQueryQueue(SVnodesMgmt *pMgmt, SRpcMsg *pRpc) { + int32_t code = -1; + SMsgHead *pHead = pRpc->pCont; // pHead->vgId = htonl(pHead->vgId); SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); if (pVnode == NULL) return -1; - int32_t code = vmWriteMsgToQueue(pVnode->pQueryQ, pMsg, false); + SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); + if (pMsg != NULL) { + code = vmWriteMsgToQueue(pVnode->pQueryQ, pMsg, false); + } vmReleaseVnode(pMgmt, pVnode); return code; } -- GitLab