diff --git a/source/dnode/mgmt/bnode/inc/bmFile.h b/source/dnode/mgmt/bnode/inc/bmFile.h new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/source/dnode/mgmt/bnode/inc/bmHandle.h b/source/dnode/mgmt/bnode/inc/bmHandle.h new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/source/dnode/mgmt/bnode/inc/bmMgmt.h b/source/dnode/mgmt/bnode/inc/bmMgmt.h new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/source/dnode/mgmt/bnode/inc/bmWorker.h b/source/dnode/mgmt/bnode/inc/bmWorker.h new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/source/dnode/mgmt/bnode/src/bmFile.c b/source/dnode/mgmt/bnode/src/bmFile.c new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/source/dnode/mgmt/bnode/src/bmHandle.c b/source/dnode/mgmt/bnode/src/bmHandle.c new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/source/dnode/mgmt/bnode/src/bmInt.c b/source/dnode/mgmt/bnode/src/bmInt.c new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/source/dnode/mgmt/bnode/src/dndBnode.c b/source/dnode/mgmt/bnode/src/bmMgmt.c similarity index 100% rename from source/dnode/mgmt/bnode/src/dndBnode.c rename to source/dnode/mgmt/bnode/src/bmMgmt.c diff --git a/source/dnode/mgmt/bnode/src/bmWorker.c b/source/dnode/mgmt/bnode/src/bmWorker.c new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/source/dnode/mgmt/dnode/inc/dndInt.h b/source/dnode/mgmt/dnode/inc/dndInt.h index d0b74834d3c0f447d1cdaf9e30a0f8237dce6c19..ecea1f159fce5aa2963513021bc13359c8eb0451 100644 --- a/source/dnode/mgmt/dnode/inc/dndInt.h +++ b/source/dnode/mgmt/dnode/inc/dndInt.h @@ -63,13 +63,24 @@ typedef enum { DND_ENV_INIT, DND_ENV_READY, DND_ENV_CLEANUP } EEnvStat; typedef struct SMgmtFp SMgmtFp; typedef struct SMgmtWrapper SMgmtWrapper; +typedef struct SMsgHandle SMsgHandle; +typedef void (*RpcMsgFp)(SDnode *pDnode, SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEps); +typedef void (*NodeMsgFp)(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg); + -typedef void (*DndMsgFp)(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEps); typedef int32_t (*MndMsgFp)(SDnode *pDnode, SMndMsg *pMsg); + typedef SMgmtWrapper *(*MgmtOpenFp)(SDnode *pDnode, const char *path); -typedef void (*MgmtCloseFp)(SDnode *pDnode, SMgmtWrapper *pMgmt); -typedef bool (*MgmtRequiredFp)(SMgmtWrapper *pMgmt); -typedef SArray *(*MgmtMsgFp)(SMgmtWrapper *pNode, SNodeMsg *pMsg); +typedef void (*MgmtCloseFp)(SDnode *pDnode, SMgmtWrapper *pWrapper); +typedef bool (*MgmtRequiredFp)(SMgmtWrapper *pWrapper); +typedef int32_t (*MgmtHandleMsgFp)(SMgmtWrapper *pNode, SNodeMsg *pMsg); +typedef SMsgHandle (*GetMsgHandleFp)(SMgmtWrapper *pWrapper, int32_t msgIndex); + +typedef struct SMsgHandle { + RpcMsgFp rpcMsgFp; + NodeMsgFp nodeMsgFp; + SMgmtWrapper *pWrapper; +} SMsgHandle; typedef struct { EWorkerType type; @@ -116,7 +127,7 @@ typedef struct { SReplica replicas[TSDB_MAX_REPLICA]; // - MndMsgFp msgFp[TDMT_MAX]; + SMsgHandle msgHandles[TDMT_MAX]; SProcObj *pProcess; bool singleProc; } SMnodeMgmt; @@ -171,16 +182,16 @@ typedef struct { } SVnodesMgmt; typedef struct { - void *serverRpc; - void *clientRpc; - DndMsgFp msgFp[TDMT_MAX]; + void *serverRpc; + void *clientRpc; + SMsgHandle msgHandles[TDMT_MAX]; } STransMgmt; typedef struct SMgmtFp { MgmtOpenFp openFp; MgmtCloseFp closeFp; MgmtRequiredFp requiredFp; - MgmtMsgFp msgFp; + GetMsgHandleFp getMsgHandleFp; } SMgmtFp; typedef struct SMgmtWrapper { @@ -216,6 +227,10 @@ void dndReportStartup(SDnode *pDnode, char *pName, char *pDesc); void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup); TdFilePtr dndCheckRunning(char *dataDir); +SMgmtWrapper *dndGetWrapper(SDnode *pDnode, ENodeType nodeType) ; + +void dndProcessRpcMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/dnode/inc/dndMain.h b/source/dnode/mgmt/dnode/inc/dndMain.h index 37d5f63396bf71ab568cd782e589a4721f1e952e..f1153c64d977cfaf38f4e5be878f6ba02ccbb3d0 100644 --- a/source/dnode/mgmt/dnode/inc/dndMain.h +++ b/source/dnode/mgmt/dnode/inc/dndMain.h @@ -22,6 +22,8 @@ extern "C" { #endif +void dndProcessRpcMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/dnode/inc/dndTransport.h b/source/dnode/mgmt/dnode/inc/dndTransport.h index c81ce9fd70cd69159e846413569140af831eced1..f6d7d97758dc31f465f84a386bde2bdc3d0a0686 100644 --- a/source/dnode/mgmt/dnode/inc/dndTransport.h +++ b/source/dnode/mgmt/dnode/inc/dndTransport.h @@ -23,8 +23,9 @@ extern "C" { #endif int32_t dndInitTrans(SDnode *pDnode); -void dndCleanupTransClient(SDnode *pDnode); void dndCleanupTrans(SDnode *pDnode); +void dndCleanupClient(SDnode *pDnode); + int32_t dndSendReqToMnode(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t dndSendReqToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pRpcMsg); diff --git a/source/dnode/mgmt/dnode/src/dndInt.c b/source/dnode/mgmt/dnode/src/dndInt.c index cd6fee812ceb76fa74299d47bdddd11b7ba94eb7..a23cfbbfe1c33d8c2f6f7583511c163f881f803f 100644 --- a/source/dnode/mgmt/dnode/src/dndInt.c +++ b/source/dnode/mgmt/dnode/src/dndInt.c @@ -16,6 +16,8 @@ #define _DEFAULT_SOURCE #include "dndInt.h" +static int8_t once = DND_ENV_INIT; + EDndStatus dndGetStatus(SDnode *pDnode) { return pDnode->status; } void dndSetStatus(SDnode *pDnode, EDndStatus status) { @@ -72,7 +74,70 @@ TdFilePtr dndCheckRunning(char *dataDir) { return pFile; } +int32_t dndInit() { + if (atomic_val_compare_exchange_8(&once, DND_ENV_INIT, DND_ENV_READY) != DND_ENV_INIT) { + terrno = TSDB_CODE_REPEAT_INIT; + dError("failed to init dnode env since %s", terrstr()); + return -1; + } + + taosIgnSIGPIPE(); + taosBlockSIGPIPE(); + taosResolveCRC(); + + if (rpcInit() != 0) { + dError("failed to init rpc since %s", terrstr()); + dndCleanup(); + return -1; + } + + if (walInit() != 0) { + dError("failed to init wal since %s", terrstr()); + dndCleanup(); + return -1; + } + + // SVnodeOpt vnodeOpt = { + // .nthreads = tsNumOfCommitThreads, .putReqToVQueryQFp = dndPutReqToVQueryQ, .sendReqToDnodeFp = + // dndSendReqToDnode}; + + // if (vnodeInit(&vnodeOpt) != 0) { + // dError("failed to init vnode since %s", terrstr()); + // dndCleanup(); + // return -1; + // } + + SMonCfg monCfg = {.maxLogs = tsMonitorMaxLogs, .port = tsMonitorPort, .server = tsMonitorFqdn, .comp = tsMonitorComp}; + if (monInit(&monCfg) != 0) { + dError("failed to init monitor since %s", terrstr()); + dndCleanup(); + return -1; + } + + dInfo("dnode env is initialized"); + return 0; +} + +void dndCleanup() { + if (atomic_val_compare_exchange_8(&once, DND_ENV_READY, DND_ENV_CLEANUP) != DND_ENV_READY) { + dError("dnode env is already cleaned up"); + return; + } + + walCleanUp(); + // vnodeCleanup(); + rpcCleanup(); + monCleanup(); + + taosStopCacheRefreshWorker(); + dInfo("dnode env is cleaned up"); +} + void dndeHandleEvent(SDnode *pDnode, EDndEvent event) { dInfo("dnode object receive event %d, data:%p", event, pDnode); pDnode->event = event; } + +SMgmtWrapper *dndGetWrapper(SDnode *pDnode, ENodeType nodeType) { + return &pDnode->mgmts[nodeType]; +} \ No newline at end of file diff --git a/source/dnode/mgmt/dnode/src/dndMain.c b/source/dnode/mgmt/dnode/src/dndMain.c index 53d8e24f5560c688351f11cc2d255f6c17e5e23b..20e3bfffb752bc8c05671bd882b47a271b843c58 100644 --- a/source/dnode/mgmt/dnode/src/dndMain.c +++ b/source/dnode/mgmt/dnode/src/dndMain.c @@ -24,8 +24,6 @@ #include "smInt.h" #include "vmInt.h" -static int8_t once = DND_ENV_INIT; - static void dndResetLog(SMgmtWrapper *pMgmt) { char logname[24] = {0}; snprintf(logname, sizeof(logname), "%slog", pMgmt->name); @@ -171,55 +169,6 @@ _OVER: return pDnode; } -#if 0 - - - - - - - if (dndInitVnodes(pDnode) != 0) { - dError("failed to init vnodes since %s", terrstr()); - dndClose(pDnode); - return NULL; - } - - if (dndInitQnode(pDnode) != 0) { - dError("failed to init qnode since %s", terrstr()); - dndClose(pDnode); - return NULL; - } - - if (dndInitSnode(pDnode) != 0) { - dError("failed to init snode since %s", terrstr()); - dndClose(pDnode); - return NULL; - } - - if (dndInitBnode(pDnode) != 0) { - dError("failed to init bnode since %s", terrstr()); - dndClose(pDnode); - return NULL; - } - - if (mmInit(pDnode) != 0) { - dError("failed to init mnode since %s", terrstr()); - dndClose(pDnode); - return NULL; - } - - -// mmCleanup(pDnode); - // dndCleanupBnode(pDnode); - // dndCleanupSnode(pDnode); - // dndCleanupQnode(pDnode); - // dndCleanupVnodes(pDnode); - - - return pDnode; -} -#endif - void dndClose(SDnode *pDnode) { if (pDnode == NULL) return; @@ -237,67 +186,12 @@ void dndClose(SDnode *pDnode) { dInfo("dnode object is closed, data:%p", pDnode); } -int32_t dndInit() { - if (atomic_val_compare_exchange_8(&once, DND_ENV_INIT, DND_ENV_READY) != DND_ENV_INIT) { - terrno = TSDB_CODE_REPEAT_INIT; - dError("failed to init dnode env since %s", terrstr()); - return -1; - } - - taosIgnSIGPIPE(); - taosBlockSIGPIPE(); - taosResolveCRC(); - - if (rpcInit() != 0) { - dError("failed to init rpc since %s", terrstr()); - dndCleanup(); - return -1; - } - - if (walInit() != 0) { - dError("failed to init wal since %s", terrstr()); - dndCleanup(); - return -1; - } - - // SVnodeOpt vnodeOpt = { - // .nthreads = tsNumOfCommitThreads, .putReqToVQueryQFp = dndPutReqToVQueryQ, .sendReqToDnodeFp = - // dndSendReqToDnode}; - - // if (vnodeInit(&vnodeOpt) != 0) { - // dError("failed to init vnode since %s", terrstr()); - // dndCleanup(); - // return -1; - // } - - SMonCfg monCfg = {.maxLogs = tsMonitorMaxLogs, .port = tsMonitorPort, .server = tsMonitorFqdn, .comp = tsMonitorComp}; - if (monInit(&monCfg) != 0) { - dError("failed to init monitor since %s", terrstr()); - dndCleanup(); - return -1; - } - - dInfo("dnode env is initialized"); - return 0; -} - -void dndCleanup() { - if (atomic_val_compare_exchange_8(&once, DND_ENV_READY, DND_ENV_CLEANUP) != DND_ENV_READY) { - dError("dnode env is already cleaned up"); - return; - } - - walCleanUp(); - // vnodeCleanup(); - rpcCleanup(); - monCleanup(); - - taosStopCacheRefreshWorker(); - dInfo("dnode env is cleaned up"); -} - void dndRun(SDnode *pDnode) { while (pDnode->event != DND_EVENT_STOP) { taosMsleep(100); } } + +void dndProcessRpcMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet) { + +} \ No newline at end of file diff --git a/source/dnode/mgmt/dnode/src/dndTransport.c b/source/dnode/mgmt/dnode/src/dndTransport.c index 831fecb855b7014378d6142b27c1d3b7866c84f0..7bcdd042788b783cbfb9d6b97ed43653907a3f80 100644 --- a/source/dnode/mgmt/dnode/src/dndTransport.c +++ b/source/dnode/mgmt/dnode/src/dndTransport.c @@ -22,13 +22,13 @@ #define _DEFAULT_SOURCE #include "dndTransport.h" #include "dndMgmt.h" -#include "mm.h" -#include "dndVnodes.h" +#include "mmInt.h" -#define INTERNAL_USER "_dnd" -#define INTERNAL_CKEY "_key" +#define INTERNAL_USER "_dnd" +#define INTERNAL_CKEY "_key" #define INTERNAL_SECRET "_pwd" +#if 0 static void dndInitMsgFp(STransMgmt *pMgmt) { // Requests handled by DNODE pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE)] = dndProcessMgmtMsg; @@ -155,8 +155,10 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY_HEARTBEAT)] = dndProcessVnodeFetchMsg; } +#endif + static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { - SDnode * pDnode = parent; + SDnode *pDnode = parent; STransMgmt *pMgmt = &pDnode->tmgmt; tmsg_t msgType = pRsp->msgType; @@ -168,11 +170,11 @@ static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { return; } - DndMsgFp fp = pMgmt->msgFp[TMSG_INDEX(msgType)]; - if (fp != NULL) { - dTrace("RPC %p, rsp:%s will be processed, code:0x%x app:%p", pRsp->handle, TMSG_INFO(msgType), pRsp->code & 0XFFFF, - pRsp->ahandle); - (*fp)(pDnode, pRsp, pEpSet); + SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)]; + if (pHandle->rpcMsgFp != NULL) { + dTrace("RPC %p, rsp:%s will be processed by %s, code:0x%x app:%p", pRsp->handle, TMSG_INFO(msgType), + pHandle->pWrapper->name, pRsp->code & 0XFFFF, pRsp->ahandle); + (*pHandle->rpcMsgFp)(pDnode, pHandle->pWrapper, pRsp, pEpSet); } else { dError("RPC %p, rsp:%s not processed, app:%p", pRsp->handle, TMSG_INFO(msgType), pRsp->ahandle); rpcFreeCont(pRsp->pCont); @@ -184,7 +186,7 @@ static int32_t dndInitClient(SDnode *pDnode) { SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); - rpcInit.label = "D-C"; + rpcInit.label = "CLI"; rpcInit.numOfThreads = 1; rpcInit.cfp = dndProcessResponse; rpcInit.sessions = 1024; @@ -209,7 +211,7 @@ static int32_t dndInitClient(SDnode *pDnode) { return 0; } -static void dndCleanupClient(SDnode *pDnode) { +void dndCleanupClient(SDnode *pDnode) { STransMgmt *pMgmt = &pDnode->tmgmt; if (pMgmt->clientRpc) { rpcClose(pMgmt->clientRpc); @@ -219,7 +221,7 @@ static void dndCleanupClient(SDnode *pDnode) { } static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) { - SDnode * pDnode = param; + SDnode *pDnode = param; STransMgmt *pMgmt = &pDnode->tmgmt; tmsg_t msgType = pReq->msgType; @@ -250,10 +252,11 @@ static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) { return; } - DndMsgFp fp = pMgmt->msgFp[TMSG_INDEX(msgType)]; - if (fp != NULL) { - dTrace("RPC %p, req:%s will be processed, app:%p", pReq->handle, TMSG_INFO(msgType), pReq->ahandle); - (*fp)(pDnode, pReq, pEpSet); + SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)]; + if (pHandle->rpcMsgFp != NULL) { + dTrace("RPC %p, req:%s will be processed by %s, app:%p", pReq->handle, TMSG_INFO(msgType), pHandle->pWrapper->name, + pReq->ahandle); + (*pHandle->rpcMsgFp)(pDnode, pHandle->pWrapper, pReq, pEpSet); } else { dError("RPC %p, req:%s not processed since no handle, app:%p", pReq->handle, TMSG_INFO(msgType), pReq->ahandle); SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED, .ahandle = pReq->ahandle}; @@ -270,37 +273,37 @@ static void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pRpcMsg, SRpcMsg *pRp rpcSendRecv(pMgmt->clientRpc, &epSet, pRpcMsg, pRpcRsp); } -static int32_t dndAuthInternalReq(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) { +static int32_t dndGetHideUserAuth(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) { + int32_t code = 0; + char pass[TSDB_PASSWORD_LEN + 1] = {0}; + if (strcmp(user, INTERNAL_USER) == 0) { - char pass[TSDB_PASSWORD_LEN + 1] = {0}; taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass); - memcpy(secret, pass, TSDB_PASSWORD_LEN); - *spi = 1; - *encrypt = 0; - *ckey = 0; - return 0; } else if (strcmp(user, TSDB_NETTEST_USER) == 0) { - char pass[TSDB_PASSWORD_LEN + 1] = {0}; taosEncryptPass_c((uint8_t *)(TSDB_NETTEST_USER), strlen(TSDB_NETTEST_USER), pass); + } else { + code = -1; + } + + if (code == 0) { memcpy(secret, pass, TSDB_PASSWORD_LEN); *spi = 1; *encrypt = 0; *ckey = 0; - return 0; - } else { - return -1; } + + return code; } static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char *encrypt, char *secret, char *ckey) { SDnode *pDnode = parent; - if (dndAuthInternalReq(parent, user, spi, encrypt, secret, ckey) == 0) { + if (dndGetHideUserAuth(parent, user, spi, encrypt, secret, ckey) == 0) { dTrace("user:%s, get auth from mnode, spi:%d encrypt:%d", user, *spi, *encrypt); return 0; } - if (dndGetUserAuthFromMnode(pDnode, user, spi, encrypt, secret, ckey) == 0) { + if (mmGetUserAuth(dndGetWrapper(pDnode, MNODE), user, spi, encrypt, secret, ckey) == 0) { dTrace("user:%s, get auth from mnode, spi:%d encrypt:%d", user, *spi, *encrypt); return 0; } @@ -313,7 +316,7 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char SAuthReq authReq = {0}; tstrncpy(authReq.user, user, TSDB_USER_LEN); int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq); - void * pReq = rpcMallocCont(contLen); + void *pReq = rpcMallocCont(contLen); tSerializeSAuthReq(pReq, contLen, &authReq); SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528}; @@ -341,7 +344,6 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char static int32_t dndInitServer(SDnode *pDnode) { STransMgmt *pMgmt = &pDnode->tmgmt; - dndInitMsgFp(pMgmt); int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0); if (numOfThreads < 1) { @@ -351,7 +353,7 @@ static int32_t dndInitServer(SDnode *pDnode) { SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localPort = pDnode->cfg.serverPort; - rpcInit.label = "D-S"; + rpcInit.label = "SRV"; rpcInit.numOfThreads = numOfThreads; rpcInit.cfp = dndProcessRequest; rpcInit.sessions = tsMaxShellConns; @@ -379,7 +381,40 @@ static void dndCleanupServer(SDnode *pDnode) { } } +static int32_t dndSetMsgHandle(SDnode *pDnode) { + STransMgmt *pMgmt = &pDnode->tmgmt; + + for (ENodeType nodeType = 0; nodeType < NODE_MAX; ++nodeType) { + SMgmtWrapper *pWrapper = &pDnode->mgmts[nodeType]; + GetMsgHandleFp getMsgHandleFp = pDnode->fps[nodeType].getMsgHandleFp; + if (getMsgHandleFp == NULL) continue; + + for (int32_t msgIndex = 0; msgIndex < TDMT_MAX; ++msgIndex) { + SMsgHandle msgHandle = (*getMsgHandleFp)(pWrapper, msgIndex); + if (msgHandle.rpcMsgFp == NULL) continue; + + SMsgHandle *pHandle = &pMgmt->msgHandles[msgIndex]; + if (pHandle->rpcMsgFp != NULL) { + dError("msg:%s, has multiple process nodes, prev node:%s, curr node:%s", tMsgInfo[msgIndex], + pHandle->pWrapper->name, pWrapper->name); + return -1; + } else { + dDebug("msg:%s, will be processed by node:%s", tMsgInfo[msgIndex], pWrapper->name); + *pHandle = msgHandle; + } + } + } + + return 0; +} + int32_t dndInitTrans(SDnode *pDnode) { + dInfo("dnode-transport start to init"); + + if (dndSetMsgHandle(pDnode) != 0) { + return -1; + } + if (dndInitClient(pDnode) != 0) { return -1; } diff --git a/source/dnode/mgmt/mnode/inc/mmFile.h b/source/dnode/mgmt/mnode/inc/mmFile.h new file mode 100644 index 0000000000000000000000000000000000000000..0aae15f077d1667977a909a700f0682ddc71d31f --- /dev/null +++ b/source/dnode/mgmt/mnode/inc/mmFile.h @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_DND_MNODE_FILE_H_ +#define _TD_DND_MNODE_FILE_H_ + +#include "mmInt.h" + +#ifdef __cplusplus +extern "C" { +#endif + +int32_t mmReadFile(SDnode *pDnode); +int32_t mmWriteFile(SDnode *pDnode); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_DND_MNODE_FILE_H_*/ \ No newline at end of file diff --git a/source/dnode/mgmt/mnode/inc/mmHandle.h b/source/dnode/mgmt/mnode/inc/mmHandle.h new file mode 100644 index 0000000000000000000000000000000000000000..9bad5ffb788ad88e9d5336fc6413cb6aff9b42f4 --- /dev/null +++ b/source/dnode/mgmt/mnode/inc/mmHandle.h @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_DND_MNODE_HANDLE_H_ +#define _TD_DND_MNODE_HANDLE_H_ + +#include "mmInt.h" + +#ifdef __cplusplus +extern "C" { +#endif + +void mmInitMsgHandles(SMgmtWrapper *pWrapper); +SMsgHandle mmGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex); + +int32_t mmGetUserAuth(SMgmtWrapper *pWrapper, char *user, char *spi, char *encrypt, char *secret, char *ckey); +int32_t mmGetMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo, + SMonGrantInfo *pGrantInfo); + + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_DND_MNODE_HANDLE_H_*/ \ No newline at end of file diff --git a/source/dnode/mgmt/mnode/inc/mmInt.h b/source/dnode/mgmt/mnode/inc/mmInt.h index 9d1a5c934f29a6f878abbecdefdef05849a7cef3..aa0d567643d3ca102013e180c1bfdc23ff09a455 100644 --- a/source/dnode/mgmt/mnode/inc/mmInt.h +++ b/source/dnode/mgmt/mnode/inc/mmInt.h @@ -16,46 +16,26 @@ #ifndef _TD_DND_MNODE_MGMT_H_ #define _TD_DND_MNODE_MGMT_H_ +#include "dndInt.h" + #ifdef __cplusplus extern "C" { #endif -#include "dndInt.h" -SMgmtFp mmGetMgmtFp(); // interface +SMgmtFp mmGetMgmtFp(); + int32_t mmInit(SDnode *pDnode); void mmCleanup(SDnode *pDnode); int32_t mmProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t mmProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t mmProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); -// mmFile -int32_t mmReadFile(SDnode *pDnode); -int32_t mmWriteFile(SDnode *pDnode); - -// mmHandle -int32_t dndGetUserAuthFromMnode(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey); +int32_t mmGetUserAuth(SMgmtWrapper *pWrapper, char *user, char *spi, char *encrypt, char *secret, char *ckey); int32_t mmGetMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo, SMonGrantInfo *pGrantInfo); -// mmMgmt -SMnode *mmAcquire(SDnode *pDnode); -void mmRelease(SDnode *pDnode, SMnode *pMnode); -int32_t mmOpen(SDnode *pDnode, SMnodeOpt *pOption); -int32_t mmAlter(SDnode *pDnode, SMnodeOpt *pOption); -int32_t mmDrop(SDnode *pDnode); -int32_t mmBuildOptionFromReq(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate); - -// mmWorker -int32_t mmStartWorker(SDnode *pDnode); -void mmStopWorker(SDnode *pDnode); -void mmInitMsgFp(SMnodeMgmt *pMgmt); -void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); -int32_t mmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); -int32_t mmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); -void mmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); -void mmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/mnode/inc/mmMgmt.h b/source/dnode/mgmt/mnode/inc/mmMgmt.h new file mode 100644 index 0000000000000000000000000000000000000000..d2075e40b6576a575ffbc832b0383064cbc5531e --- /dev/null +++ b/source/dnode/mgmt/mnode/inc/mmMgmt.h @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_DND_MNODE_MGMT_H_ +#define _TD_DND_MNODE_MGMT_H_ + +#include "mmInt.h" + +#ifdef __cplusplus +extern "C" { +#endif + +SMnode *mmAcquire(SDnode *pDnode); +void mmRelease(SDnode *pDnode, SMnode *pMnode); +int32_t mmOpen(SDnode *pDnode, SMnodeOpt *pOption); +int32_t mmAlter(SDnode *pDnode, SMnodeOpt *pOption); +int32_t mmDrop(SDnode *pDnode); +int32_t mmBuildOptionFromReq(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_DND_MNODE_MGMT_H_*/ \ No newline at end of file diff --git a/source/dnode/mgmt/mnode/inc/mmWorker.h b/source/dnode/mgmt/mnode/inc/mmWorker.h new file mode 100644 index 0000000000000000000000000000000000000000..0ffe109cbd2a58fd8ab261003b7858842e77ab2e --- /dev/null +++ b/source/dnode/mgmt/mnode/inc/mmWorker.h @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_DND_MNODE_WORKER_H_ +#define _TD_DND_MNODE_WORKER_H_ + +#include "mmInt.h" + +#ifdef __cplusplus +extern "C" { +#endif + +int32_t mmStartWorker(SDnode *pDnode); +void mmStopWorker(SDnode *pDnode); +void mmInitMsgFp(SMnodeMgmt *pMgmt); +void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); +int32_t mmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); +int32_t mmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); +void mmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); +void mmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); + +void mmProcessWriteMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +void mmProcessSyncMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +void mmProcessReadMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_DND_MNODE_WORKER_H_*/ \ No newline at end of file diff --git a/source/dnode/mgmt/mnode/src/mmHandle.c b/source/dnode/mgmt/mnode/src/mmHandle.c index 2fb0cb113ec4d67f80d9a1362ce8e7c65666a4c3..a44c05ccf23638f0f8e1b3c82f08b5019965cedd 100644 --- a/source/dnode/mgmt/mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mnode/src/mmHandle.c @@ -14,7 +14,8 @@ */ #define _DEFAULT_SOURCE -#include "mmInt.h" +#include "mmHandle.h" +#include "mmWorker.h" #if 0 #include "dndMgmt.h" @@ -122,7 +123,7 @@ int32_t mmGetMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SMonVgro return code; } -int32_t dndGetUserAuthFromMnode(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) { +int32_t mmGetUserAuth(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnode *pMnode = mmAcquire(pDnode); @@ -139,4 +140,94 @@ int32_t dndGetUserAuthFromMnode(SDnode *pDnode, char *user, char *spi, char *enc return code; } -#endif \ No newline at end of file +#endif + +static void mmSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp) { + SMnodeMgmt *pMgmt = pWrapper->pMgmt; + SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)]; + + pHandle->pWrapper = pWrapper; + pHandle->nodeMsgFp = nodeMsgFp; + pHandle->rpcMsgFp = dndProcessRpcMsg; +} + +void mmInitMsgHandles(SMgmtWrapper *pWrapper) { + // Requests handled by DNODE + mmSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE_RSP, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE_RSP, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE_RSP, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE_RSP, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE_RSP, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE_RSP, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE_RSP, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE_RSP, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE_RSP, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE_RSP, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE_RSP, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE_RSP, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE_RSP, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE_RSP, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE_RSP, mmProcessWriteMsg); + + // Requests handled by MNODE + mmSetMsgHandle(pWrapper, TDMT_MND_CONNECT, mmProcessReadMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_CREATE_ACCT, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_ALTER_ACCT, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_DROP_ACCT, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_CREATE_USER, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_ALTER_USER, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_DROP_USER, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_GET_USER_AUTH, mmProcessReadMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_CREATE_DNODE, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_CONFIG_DNODE, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_DROP_DNODE, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_CREATE_MNODE, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_DROP_MNODE, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_CREATE_QNODE, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_DROP_QNODE, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_CREATE_SNODE, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_DROP_SNODE, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_CREATE_BNODE, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_DROP_BNODE, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_CREATE_DB, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_DROP_DB, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_USE_DB, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_ALTER_DB, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_SYNC_DB, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_COMPACT_DB, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_CREATE_FUNC, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_RETRIEVE_FUNC, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_DROP_FUNC, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_CREATE_STB, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_ALTER_STB, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_DROP_STB, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_TABLE_META, mmProcessReadMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_VGROUP_LIST, mmProcessReadMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_KILL_QUERY, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_KILL_CONN, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_HEARTBEAT, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_SHOW, mmProcessReadMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_SHOW_RETRIEVE, mmProcessReadMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_STATUS, mmProcessReadMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_KILL_TRANS, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_GRANT, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_AUTH, mmProcessReadMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_CREATE_TOPIC, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_ALTER_TOPIC, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_DROP_TOPIC, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_MND_GET_SUB_EP, mmProcessReadMsg); + + // Requests handled by VNODE + mmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_VND_MQ_REB_RSP, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB_RSP, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB_RSP, mmProcessWriteMsg); + mmSetMsgHandle(pWrapper, TDMT_VND_DROP_STB_RSP, mmProcessWriteMsg); +} + +SMsgHandle mmGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex) { + SMnodeMgmt *pMgmt = pWrapper->pMgmt; + return pMgmt->msgHandles[msgIndex]; +} diff --git a/source/dnode/mgmt/mnode/src/mmInt.c b/source/dnode/mgmt/mnode/src/mmInt.c new file mode 100644 index 0000000000000000000000000000000000000000..9c0d9ade845e97095f656a407b11deeb1499e31c --- /dev/null +++ b/source/dnode/mgmt/mnode/src/mmInt.c @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "mmInt.h" +#include "mmHandle.h" + +SMgmtFp mmGetMgmtFp() { + SMgmtFp mgmtFp = {0}; + mgmtFp.getMsgHandleFp = mmGetMsgHandle; +} + + +int32_t mmGetUserAuth(SMgmtWrapper *pWrapper, char *user, char *spi, char *encrypt, char *secret, char *ckey) { + return 0; +} diff --git a/source/dnode/mgmt/mnode/src/mmMgmt.c b/source/dnode/mgmt/mnode/src/mmMgmt.c index d4e61336ad2f066e551d3993260a5c3efe78dd8f..2abbefae2274e431d210454a2280d93a22e8520a 100644 --- a/source/dnode/mgmt/mnode/src/mmMgmt.c +++ b/source/dnode/mgmt/mnode/src/mmMgmt.c @@ -317,4 +317,5 @@ static int32_t mmOpenImp(SDnode *pDnode, SMnodeOpt *pOption) { return 0; } -#endif \ No newline at end of file +#endif + diff --git a/source/dnode/mgmt/mnode/src/mmWorker.c b/source/dnode/mgmt/mnode/src/mmWorker.c index 021f35e8d7d23c03effa680abeb306b29df144ea..c0150e4dca413ea74ca1fdfd36f9258cee81023c 100644 --- a/source/dnode/mgmt/mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mnode/src/mmWorker.c @@ -65,79 +65,7 @@ void mmStopWorker(SDnode *pDnode) { } void mmInitMsgFp(SMnodeMgmt *pMgmt) { - // Requests handled by DNODE - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE_RSP)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_MNODE_RSP)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_MNODE_RSP)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_QNODE_RSP)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_QNODE_RSP)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_SNODE_RSP)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_SNODE_RSP)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_BNODE_RSP)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_BNODE_RSP)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_VNODE_RSP)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_VNODE_RSP)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_VNODE_RSP)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_SYNC_VNODE_RSP)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_COMPACT_VNODE_RSP)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CONFIG_DNODE_RSP)] = mmProcessWriteMsg; - - // Requests handled by MNODE - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CONNECT)] = mmProcessReadMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_ACCT)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_ACCT)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_ACCT)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_USER)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_USER)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_USER)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GET_USER_AUTH)] = mmProcessReadMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_DNODE)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CONFIG_DNODE)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_DNODE)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_MNODE)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_MNODE)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_QNODE)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_QNODE)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_SNODE)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_SNODE)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_BNODE)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_BNODE)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_DB)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_DB)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_USE_DB)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_DB)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SYNC_DB)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_COMPACT_DB)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_FUNC)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_FUNC)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_STB)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_STB)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_STB)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TABLE_META)] = mmProcessReadMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_VGROUP_LIST)] = mmProcessReadMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_QUERY)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_CONN)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_HEARTBEAT)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW)] = mmProcessReadMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = mmProcessReadMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = mmProcessReadMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_TRANS)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH)] = mmProcessReadMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_TOPIC)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_TOPIC)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_TOPIC)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SUBSCRIBE)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_MQ_COMMIT_OFFSET)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GET_SUB_EP)] = mmProcessReadMsg; - - // Requests handled by VNODE - pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN_RSP)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_REB_RSP)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CREATE_STB_RSP)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_STB_RSP)] = mmProcessWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_STB_RSP)] = mmProcessWriteMsg; + } static void mmSendRpcRsp(SDnode *pDnode, SRpcMsg *pRpc) { diff --git a/source/dnode/mgmt/qnode/inc/qmFile.h b/source/dnode/mgmt/qnode/inc/qmFile.h new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/source/dnode/mgmt/qnode/inc/qmHandle.h b/source/dnode/mgmt/qnode/inc/qmHandle.h new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/source/dnode/mgmt/qnode/inc/qmMgmt.h b/source/dnode/mgmt/qnode/inc/qmMgmt.h new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/source/dnode/mgmt/qnode/inc/qmWorker.h b/source/dnode/mgmt/qnode/inc/qmWorker.h new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/source/dnode/mgmt/qnode/src/qmFile.c b/source/dnode/mgmt/qnode/src/qmFile.c new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/source/dnode/mgmt/qnode/src/qmHandle.c b/source/dnode/mgmt/qnode/src/qmHandle.c new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/source/dnode/mgmt/qnode/src/qmInt.c b/source/dnode/mgmt/qnode/src/qmInt.c new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/source/dnode/mgmt/qnode/src/dndQnode.c b/source/dnode/mgmt/qnode/src/qmMgmt.c similarity index 100% rename from source/dnode/mgmt/qnode/src/dndQnode.c rename to source/dnode/mgmt/qnode/src/qmMgmt.c diff --git a/source/dnode/mgmt/qnode/src/qmWorker.c b/source/dnode/mgmt/qnode/src/qmWorker.c new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/source/dnode/mgmt/snode/inc/smFile.h b/source/dnode/mgmt/snode/inc/smFile.h new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/source/dnode/mgmt/snode/inc/smHandle.h b/source/dnode/mgmt/snode/inc/smHandle.h new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/source/dnode/mgmt/snode/inc/smMgmt.h b/source/dnode/mgmt/snode/inc/smMgmt.h new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/source/dnode/mgmt/snode/inc/smWorker.h b/source/dnode/mgmt/snode/inc/smWorker.h new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/source/dnode/mgmt/snode/src/smFile.c b/source/dnode/mgmt/snode/src/smFile.c new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/source/dnode/mgmt/snode/src/smHandle.c b/source/dnode/mgmt/snode/src/smHandle.c new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/source/dnode/mgmt/snode/src/smInt.c b/source/dnode/mgmt/snode/src/smInt.c new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/source/dnode/mgmt/snode/src/dndSnode.c b/source/dnode/mgmt/snode/src/smMgmt.c similarity index 100% rename from source/dnode/mgmt/snode/src/dndSnode.c rename to source/dnode/mgmt/snode/src/smMgmt.c diff --git a/source/dnode/mgmt/snode/src/smWorker.c b/source/dnode/mgmt/snode/src/smWorker.c new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/source/dnode/mgmt/vnode/inc/vmFile.h b/source/dnode/mgmt/vnode/inc/vmFile.h new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/source/dnode/mgmt/vnode/inc/vmHandle.h b/source/dnode/mgmt/vnode/inc/vmHandle.h new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/source/dnode/mgmt/vnode/inc/vmMgmt.h b/source/dnode/mgmt/vnode/inc/vmMgmt.h new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/source/dnode/mgmt/vnode/inc/vmWorker.h b/source/dnode/mgmt/vnode/inc/vmWorker.h new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/source/dnode/mgmt/vnode/src/vmFile.c b/source/dnode/mgmt/vnode/src/vmFile.c new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/source/dnode/mgmt/vnode/src/vmHandle.c b/source/dnode/mgmt/vnode/src/vmHandle.c new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/source/dnode/mgmt/vnode/src/vmInt.c b/source/dnode/mgmt/vnode/src/vmInt.c new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/source/dnode/mgmt/vnode/src/dndVnodes.c b/source/dnode/mgmt/vnode/src/vmMgmt.c similarity index 100% rename from source/dnode/mgmt/vnode/src/dndVnodes.c rename to source/dnode/mgmt/vnode/src/vmMgmt.c diff --git a/source/dnode/mgmt/vnode/src/vmWorker.c b/source/dnode/mgmt/vnode/src/vmWorker.c new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391