From 7e361ebcd09a9064fed94b65a3bfdff34f30bf8f Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 2 Apr 2022 19:52:04 +0800 Subject: [PATCH] refact dm --- include/util/tdef.h | 6 +- source/dnode/mgmt/dm/inc/dmInt.h | 7 +- source/dnode/mgmt/dm/src/dmHandle.c | 37 ++--- source/dnode/mgmt/dm/src/dmInt.c | 2 +- source/dnode/mgmt/dm/src/dmWorker.c | 53 +++---- source/dnode/mgmt/main/src/dndInt.c | 7 + source/dnode/mgmt/main/src/dndTransport.c | 8 +- source/dnode/mgmt/mm/src/mmHandle.c | 162 ++++++++++----------- source/dnode/mgmt/qm/src/qmHandle.c | 18 +-- source/dnode/mgmt/sm/src/smHandle.c | 4 +- source/dnode/mgmt/vm/src/vmHandle.c | 90 ++++++------ source/dnode/mnode/impl/src/mndQuery.c | 2 +- source/libs/planner/src/planPhysiCreater.c | 2 +- 13 files changed, 201 insertions(+), 197 deletions(-) diff --git a/include/util/tdef.h b/include/util/tdef.h index be88afcf66..f1af9eca1f 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -480,9 +480,9 @@ enum { SND_WORKER_TYPE__UNIQUE, }; -#define MND_VGID -1 -#define QND_VGID 1 -#define VND_VGID 0 +#define MNODE_HANDLE -1 +#define QNODE_HANDLE 1 +#define DEFAULT_HANDLE 0 #define MAX_NUM_STR_SIZE 40 diff --git a/source/dnode/mgmt/dm/inc/dmInt.h b/source/dnode/mgmt/dm/inc/dmInt.h index a8f46ef7e2..98abb8ef54 100644 --- a/source/dnode/mgmt/dm/inc/dmInt.h +++ b/source/dnode/mgmt/dm/inc/dmInt.h @@ -45,7 +45,7 @@ int32_t dmWriteFile(SDnodeMgmt *pMgmt); void dmUpdateDnodeEps(SDnodeMgmt *pMgmt, SArray *pDnodeEps); // dmMsg.c -void dmInitMsgHandles(SMgmtWrapper *pWrapper); +void dmInitMsgHandle(SMgmtWrapper *pWrapper); void dmSendStatusReq(SDnodeMgmt *pMgmt); int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t dmProcessStatusRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg); @@ -53,10 +53,11 @@ int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg); // dmWorker.c +int32_t dmStartThread(SDnodeMgmt *pMgmt); int32_t dmStartWorker(SDnodeMgmt *pMgmt); void dmStopWorker(SDnodeMgmt *pMgmt); -int32_t dmStartThread(SDnodeMgmt *pMgmt); -int32_t dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t dmPutMsgToMgmtWorker(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t dmPutMsgToStatusWorker(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/dm/src/dmHandle.c b/source/dnode/mgmt/dm/src/dmHandle.c index 979db7aac0..07eca3f230 100644 --- a/source/dnode/mgmt/dm/src/dmHandle.c +++ b/source/dnode/mgmt/dm/src/dmHandle.c @@ -195,14 +195,7 @@ int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg) { } } -void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc) { - SStartupReq *pStartup = &pDnode->startup; - tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN); - tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN); - pStartup->finished = 0; -} - -void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup) { +static void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup) { memcpy(pStartup, &pDnode->startup, sizeof(SStartupReq)); pStartup->finished = (dndGetStatus(pDnode) == DND_STAT_RUNNING); } @@ -218,21 +211,21 @@ void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) { rpcSendResponse(&rpcRsp); } -void dmInitMsgHandles(SMgmtWrapper *pWrapper) { +void dmInitMsgHandle(SMgmtWrapper *pWrapper) { // Requests handled by DNODE - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE, dmProcessMgmtMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE, dmProcessMgmtMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE, dmProcessMgmtMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE, dmProcessMgmtMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE, dmProcessMgmtMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE, dmProcessMgmtMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE, dmProcessMgmtMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE, dmProcessMgmtMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE, dmProcessMgmtMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_NETWORK_TEST, dmProcessMgmtMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE, dmPutMsgToMgmtWorker, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE, dmPutMsgToMgmtWorker, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE, dmPutMsgToMgmtWorker, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE, dmPutMsgToMgmtWorker, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE, dmPutMsgToMgmtWorker, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE, dmPutMsgToMgmtWorker, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE, dmPutMsgToMgmtWorker, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE, dmPutMsgToMgmtWorker, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE, dmPutMsgToMgmtWorker, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_DND_NETWORK_TEST, dmPutMsgToMgmtWorker, DEFAULT_HANDLE); // Requests handled by MNODE - dndSetMsgHandle(pWrapper, TDMT_MND_STATUS_RSP, dmProcessMgmtMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_GRANT_RSP, dmProcessMgmtMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_AUTH_RSP, dmProcessMgmtMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_STATUS_RSP, dmPutMsgToStatusWorker, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_GRANT_RSP, dmPutMsgToMgmtWorker, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_AUTH_RSP, dmPutMsgToMgmtWorker, DEFAULT_HANDLE); } diff --git a/source/dnode/mgmt/dm/src/dmInt.c b/source/dnode/mgmt/dm/src/dmInt.c index 8c46c402ab..1e38b8304d 100644 --- a/source/dnode/mgmt/dm/src/dmInt.c +++ b/source/dnode/mgmt/dm/src/dmInt.c @@ -165,7 +165,7 @@ void dmGetMgmtFp(SMgmtWrapper *pWrapper) { mgmtFp.startFp = dmStart; mgmtFp.requiredFp = dmRequire; - dmInitMsgHandles(pWrapper); + dmInitMsgHandle(pWrapper); pWrapper->name = "dnode"; pWrapper->fp = mgmtFp; } diff --git a/source/dnode/mgmt/dm/src/dmWorker.c b/source/dnode/mgmt/dm/src/dmWorker.c index 9045a44232..6061189ca1 100644 --- a/source/dnode/mgmt/dm/src/dmWorker.c +++ b/source/dnode/mgmt/dm/src/dmWorker.c @@ -18,7 +18,7 @@ static void *dmThreadRoutine(void *param) { SDnodeMgmt *pMgmt = param; - SDnode * pDnode = pMgmt->pDnode; + SDnode *pDnode = pMgmt->pDnode; int64_t lastStatusTime = taosGetTimestampMs(); int64_t lastMonitorTime = lastStatusTime; @@ -32,8 +32,7 @@ static void *dmThreadRoutine(void *param) { } int64_t curTime = taosGetTimestampMs(); - - float statusInterval = (curTime - lastStatusTime) / 1000.0f; + float statusInterval = (curTime - lastStatusTime) / 1000.0f; if (statusInterval >= tsStatusInterval && !pMgmt->statusSent) { dmSendStatusReq(pMgmt); lastStatusTime = curTime; @@ -47,10 +46,21 @@ static void *dmThreadRoutine(void *param) { } } +int32_t dmStartThread(SDnodeMgmt *pMgmt) { + pMgmt->threadId = taosCreateThread(dmThreadRoutine, pMgmt); + if (pMgmt->threadId == NULL) { + dError("failed to init dnode thread"); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + return 0; +} + static void dmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { SDnodeMgmt *pMgmt = pInfo->ahandle; - SDnode * pDnode = pMgmt->pDnode; + SDnode *pDnode = pMgmt->pDnode; SRpcMsg *pRpc = &pMsg->rpcMsg; int32_t code = -1; dTrace("msg:%p, will be processed in dnode queue", pMsg); @@ -95,16 +105,14 @@ static void dmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { } int32_t dmStartWorker(SDnodeMgmt *pMgmt) { - SSingleWorkerCfg mgmtCfg = { - .min = 1, .max = 1, .name = "dnode-mgmt", .fp = (FItem)dmProcessQueue, .param = pMgmt}; - if (tSingleWorkerInit(&pMgmt->mgmtWorker, &mgmtCfg) != 0) { + SSingleWorkerCfg mcfg = {.min = 1, .max = 1, .name = "dnode-mgmt", .fp = (FItem)dmProcessQueue, .param = pMgmt}; + if (tSingleWorkerInit(&pMgmt->mgmtWorker, &mcfg) != 0) { dError("failed to start dnode mgmt worker since %s", terrstr()); return -1; } - SSingleWorkerCfg statusCfg = { - .min = 1, .max = 1, .name = "dnode-status", .fp = (FItem)dmProcessQueue, .param = pMgmt}; - if (tSingleWorkerInit(&pMgmt->statusWorker, &statusCfg) != 0) { + SSingleWorkerCfg scfg = {.min = 1, .max = 1, .name = "dnode-status", .fp = (FItem)dmProcessQueue, .param = pMgmt}; + if (tSingleWorkerInit(&pMgmt->statusWorker, &scfg) != 0) { dError("failed to start dnode status worker since %s", terrstr()); return -1; } @@ -113,17 +121,6 @@ int32_t dmStartWorker(SDnodeMgmt *pMgmt) { return 0; } -int32_t dmStartThread(SDnodeMgmt *pMgmt) { - pMgmt->threadId = taosCreateThread(dmThreadRoutine, pMgmt); - if (pMgmt->threadId == NULL) { - dError("failed to init dnode thread"); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - return 0; -} - void dmStopWorker(SDnodeMgmt *pMgmt) { tSingleWorkerCleanup(&pMgmt->mgmtWorker); tSingleWorkerCleanup(&pMgmt->statusWorker); @@ -135,12 +132,18 @@ void dmStopWorker(SDnodeMgmt *pMgmt) { dDebug("dnode workers are closed"); } -int32_t dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { +int32_t dmPutMsgToMgmtWorker(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { SDnodeMgmt *pMgmt = pWrapper->pMgmt; SSingleWorker *pWorker = &pMgmt->mgmtWorker; - if (pMsg->rpcMsg.msgType == TDMT_MND_STATUS_RSP) { - pWorker = &pMgmt->statusWorker; - } + + dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); + taosWriteQitem(pWorker->queue, pMsg); + return 0; +} + +int32_t dmPutMsgToStatusWorker(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { + SDnodeMgmt *pMgmt = pWrapper->pMgmt; + SSingleWorker *pWorker = &pMgmt->statusWorker; dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); taosWriteQitem(pWorker->queue, pMsg); diff --git a/source/dnode/mgmt/main/src/dndInt.c b/source/dnode/mgmt/main/src/dndInt.c index 5e4b0a1f7b..72feed7c62 100644 --- a/source/dnode/mgmt/main/src/dndInt.c +++ b/source/dnode/mgmt/main/src/dndInt.c @@ -204,3 +204,10 @@ void dndSetStatus(SDnode *pDnode, EDndStatus status) { pDnode->status = status; } } + +void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc) { + SStartupReq *pStartup = &pDnode->startup; + tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN); + tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN); + pStartup->finished = 0; +} diff --git a/source/dnode/mgmt/main/src/dndTransport.c b/source/dnode/mgmt/main/src/dndTransport.c index 38bc13fec0..babc6d1209 100644 --- a/source/dnode/mgmt/main/src/dndTransport.c +++ b/source/dnode/mgmt/main/src/dndTransport.c @@ -137,9 +137,9 @@ static void dndProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { if (pHandle->pMndWrapper != NULL || pHandle->pQndWrapper != NULL) { SMsgHead *pHead = pMsg->pCont; int32_t vgId = ntohl(pHead->vgId); - if (vgId == QND_VGID) { + if (vgId == QNODE_HANDLE) { pWrapper = pHandle->pQndWrapper; - } else if (vgId == MND_VGID) { + } else if (vgId == MNODE_HANDLE) { pWrapper = pHandle->pMndWrapper; } else { } @@ -315,13 +315,13 @@ int32_t dndInitMsgHandle(SDnode *pDnode) { if (msgFp == NULL) continue; SMsgHandle *pHandle = &pMgmt->msgHandles[msgIndex]; - if (vgId == QND_VGID) { + if (vgId == QNODE_HANDLE) { if (pHandle->pQndWrapper != NULL) { dError("msg:%s has multiple process nodes", tMsgInfo[msgIndex]); return -1; } pHandle->pQndWrapper = pWrapper; - } else if (vgId == MND_VGID) { + } else if (vgId == MNODE_HANDLE) { if (pHandle->pMndWrapper != NULL) { dError("msg:%s has multiple process nodes", tMsgInfo[msgIndex]); return -1; diff --git a/source/dnode/mgmt/mm/src/mmHandle.c b/source/dnode/mgmt/mm/src/mmHandle.c index 6afcd249b3..666f9daa91 100644 --- a/source/dnode/mgmt/mm/src/mmHandle.c +++ b/source/dnode/mgmt/mm/src/mmHandle.c @@ -75,91 +75,91 @@ int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { void mmInitMsgHandles(SMgmtWrapper *pWrapper) { // Requests handled by DNODE - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE_RSP, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE_RSP, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE_RSP, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE_RSP, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE_RSP, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE_RSP, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE_RSP, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE_RSP, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE_RSP, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE_RSP, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE_RSP, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE_RSP, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE_RSP, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE_RSP, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE_RSP, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); // Requests handled by MNODE - dndSetMsgHandle(pWrapper, TDMT_MND_CONNECT, mmProcessReadMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_ACCT, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_ACCT, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_ACCT, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_USER, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_USER, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_USER, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_GET_USER_AUTH, mmProcessReadMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_DNODE, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_CONFIG_DNODE, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_DNODE, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_MNODE, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_MNODE, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_QNODE, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_QNODE, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_SNODE, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_SNODE, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_BNODE, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_BNODE, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_DB, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_DB, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_USE_DB, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_DB, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_SYNC_DB, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_COMPACT_DB, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_FUNC, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_RETRIEVE_FUNC, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_FUNC, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STB, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_STB, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_STB, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_SMA, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_SMA, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_TABLE_META, mmProcessReadMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_VGROUP_LIST, mmProcessReadMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_KILL_QUERY, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_KILL_CONN, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_HEARTBEAT, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_SHOW, mmProcessReadMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_SHOW_RETRIEVE, mmProcessReadMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_SYSTABLE_RETRIEVE, mmProcessReadMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_STATUS, mmProcessReadMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_KILL_TRANS, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_GRANT, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_AUTH, mmProcessReadMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_TOPIC, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_TOPIC, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_TOPIC, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_GET_SUB_EP, mmProcessReadMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STREAM, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY_RSP, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_CONNECT, mmProcessReadMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_ACCT, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_ACCT, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_ACCT, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_USER, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_USER, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_USER, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_GET_USER_AUTH, mmProcessReadMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_DNODE, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_CONFIG_DNODE, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_DNODE, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_MNODE, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_MNODE, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_QNODE, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_QNODE, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_SNODE, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_SNODE, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_BNODE, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_BNODE, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_DB, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_DB, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_USE_DB, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_DB, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_SYNC_DB, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_COMPACT_DB, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_FUNC, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_RETRIEVE_FUNC, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_FUNC, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STB, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_STB, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_STB, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_SMA, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_SMA, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_TABLE_META, mmProcessReadMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_VGROUP_LIST, mmProcessReadMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_KILL_QUERY, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_KILL_CONN, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_HEARTBEAT, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_SHOW, mmProcessReadMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_SHOW_RETRIEVE, mmProcessReadMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_SYSTABLE_RETRIEVE, mmProcessReadMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_STATUS, mmProcessReadMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_KILL_TRANS, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_GRANT, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_AUTH, mmProcessReadMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_TOPIC, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_TOPIC, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_TOPIC, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_GET_SUB_EP, mmProcessReadMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STREAM, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); // Requests handled by VNODE - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB_RSP, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB_RSP, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB_RSP, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB_RSP, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA_RSP, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA_RSP, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); - dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, mmProcessQueryMsg, MND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, mmProcessQueryMsg, MND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, mmProcessQueryMsg, MND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, mmProcessQueryMsg, MND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, mmProcessQueryMsg, MND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, mmProcessQueryMsg, MNODE_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, mmProcessQueryMsg, MNODE_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, mmProcessQueryMsg, MNODE_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, mmProcessQueryMsg, MNODE_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, mmProcessQueryMsg, MNODE_HANDLE); } diff --git a/source/dnode/mgmt/qm/src/qmHandle.c b/source/dnode/mgmt/qm/src/qmHandle.c index da5ba6472a..14ac08af05 100644 --- a/source/dnode/mgmt/qm/src/qmHandle.c +++ b/source/dnode/mgmt/qm/src/qmHandle.c @@ -56,14 +56,14 @@ int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { void qmInitMsgHandles(SMgmtWrapper *pWrapper) { // Requests handled by VNODE - dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, qmProcessQueryMsg, QND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, qmProcessQueryMsg, QND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, qmProcessFetchMsg, QND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, qmProcessFetchMsg, QND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, qmProcessQueryMsg, QNODE_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, qmProcessQueryMsg, QNODE_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, qmProcessFetchMsg, QNODE_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, qmProcessFetchMsg, QNODE_HANDLE); - dndSetMsgHandle(pWrapper, TDMT_VND_RES_READY, qmProcessFetchMsg, QND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, qmProcessFetchMsg, QND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, qmProcessFetchMsg, QND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, qmProcessFetchMsg, QND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, qmProcessFetchMsg, QND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_RES_READY, qmProcessFetchMsg, QNODE_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, qmProcessFetchMsg, QNODE_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, qmProcessFetchMsg, QNODE_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, qmProcessFetchMsg, QNODE_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, qmProcessFetchMsg, QNODE_HANDLE); } diff --git a/source/dnode/mgmt/sm/src/smHandle.c b/source/dnode/mgmt/sm/src/smHandle.c index c522ef7fc3..dc5ee1155a 100644 --- a/source/dnode/mgmt/sm/src/smHandle.c +++ b/source/dnode/mgmt/sm/src/smHandle.c @@ -56,6 +56,6 @@ int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { void smInitMsgHandles(SMgmtWrapper *pWrapper) { // Requests handled by SNODE - dndSetMsgHandle(pWrapper, TDMT_SND_TASK_DEPLOY, smProcessMgmtMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_SND_TASK_EXEC, smProcessExecMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_SND_TASK_DEPLOY, smProcessMgmtMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_SND_TASK_EXEC, smProcessExecMsg, DEFAULT_HANDLE); } diff --git a/source/dnode/mgmt/vm/src/vmHandle.c b/source/dnode/mgmt/vm/src/vmHandle.c index c3ad74d246..6a51c9a77f 100644 --- a/source/dnode/mgmt/vm/src/vmHandle.c +++ b/source/dnode/mgmt/vm/src/vmHandle.c @@ -235,49 +235,49 @@ int32_t vmProcessCompactVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { void vmInitMsgHandles(SMgmtWrapper *pWrapper) { // Requests handled by VNODE - dndSetMsgHandle(pWrapper, TDMT_VND_SUBMIT, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)vmProcessQueryMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, (NodeMsgFp)vmProcessQueryMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_UPDATE_TAG_VAL, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_TABLE_META, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_TABLES_META, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONSUME, (NodeMsgFp)vmProcessQueryMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_QUERY, (NodeMsgFp)vmProcessQueryMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONNECT, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_DISCONNECT, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_RES_READY, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_TABLE, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TABLE, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_SMA, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES_FETCH, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_CONSUME, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_TASK_PIPE_EXEC, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_TASK_MERGE_EXEC, (NodeMsgFp)vmProcessMergeMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_TASK_WRITE_EXEC, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_STREAM_TRIGGER, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); - - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, vmProcessMgmtMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, vmProcessMgmtMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, vmProcessMgmtMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE, vmProcessMgmtMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE, vmProcessMgmtMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_SUBMIT, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)vmProcessQueryMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, (NodeMsgFp)vmProcessQueryMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_UPDATE_TAG_VAL, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_TABLE_META, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_TABLES_META, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONSUME, (NodeMsgFp)vmProcessQueryMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_QUERY, (NodeMsgFp)vmProcessQueryMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONNECT, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_DISCONNECT, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_RES_READY, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_TABLE, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TABLE, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_SMA, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES_FETCH, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_CONSUME, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_TASK_PIPE_EXEC, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_TASK_MERGE_EXEC, (NodeMsgFp)vmProcessMergeMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_TASK_WRITE_EXEC, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_STREAM_TRIGGER, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE); + + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, vmProcessMgmtMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, vmProcessMgmtMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, vmProcessMgmtMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE, vmProcessMgmtMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE, vmProcessMgmtMsg, DEFAULT_HANDLE); } diff --git a/source/dnode/mnode/impl/src/mndQuery.c b/source/dnode/mnode/impl/src/mndQuery.c index 40b4f60bd4..d184e354c4 100644 --- a/source/dnode/mnode/impl/src/mndQuery.c +++ b/source/dnode/mnode/impl/src/mndQuery.c @@ -52,7 +52,7 @@ int32_t mndProcessFetchMsg(SNodeMsg *pReq) { } int32_t mndInitQuery(SMnode *pMnode) { - if (qWorkerInit(NODE_TYPE_MNODE, MND_VGID, NULL, (void **)&pMnode->pQuery, &pMnode->msgCb) != 0) { + if (qWorkerInit(NODE_TYPE_MNODE, MNODE_HANDLE, NULL, (void **)&pMnode->pQuery, &pMnode->msgCb) != 0) { mError("failed to init qworker in mnode since %s", terrstr()); return -1; } diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 11baeff04a..5e466c8cda 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -419,7 +419,7 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode); taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode); } else { - SQueryNodeAddr addr = { .nodeId = MND_VGID, .epSet = pCxt->pPlanCxt->mgmtEpSet }; + SQueryNodeAddr addr = { .nodeId = MNODE_HANDLE, .epSet = pCxt->pPlanCxt->mgmtEpSet }; taosArrayPush(pCxt->pExecNodeList, &addr); } pScan->mgmtEpSet = pCxt->pPlanCxt->mgmtEpSet; -- GitLab