提交 f3e74788 编写于 作者: S Shengliang Guan

shm

上级 dabb896a
......@@ -22,13 +22,10 @@
extern "C" {
#endif
void bmInitMsgHandles(SMgmtWrapper *pWrapper);
SMsgHandle bmGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex);
void bmInitMsgHandles(SMgmtWrapper *pWrapper);
int32_t bmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t bmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
#ifdef __cplusplus
}
#endif
......
......@@ -30,9 +30,6 @@ typedef struct SBnodeMgmt {
SBnode *pBnode;
SRWLatch latch;
SDnodeWorker writeWorker;
//
SMsgHandle msgHandles[TDMT_MAX];
SProcObj *pProcess;
bool singleProc;
} SBnodeMgmt;
......
......@@ -23,8 +23,3 @@ int32_t bmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {return 0;}
void bmInitMsgHandles(SMgmtWrapper *pWrapper) {
}
SMsgHandle bmGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex) {
SBnodeMgmt *pMgmt = pWrapper->pMgmt;
return pMgmt->msgHandles[msgIndex];
}
......@@ -24,9 +24,8 @@ void bmGetMgmtFp(SMgmtWrapper *pWrapper) {
mgmtFp.openFp = NULL;
mgmtFp.closeFp = NULL;
mgmtFp.requiredFp = bmRequireNode;
mgmtFp.getMsgHandleFp = bmGetMsgHandle;
// bmInitMsgHandles(pWrapper);
bmInitMsgHandles(pWrapper);
pWrapper->name = "snode";
pWrapper->fp = mgmtFp;
}
......@@ -77,7 +77,6 @@ typedef void (*NodeMsgFp)(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
typedef int32_t (*OpenNodeFp)(SMgmtWrapper *pWrapper);
typedef void (*CloseNodeFp)(SMgmtWrapper *pWrapper);
typedef bool (*RequireNodeFp)(SMgmtWrapper *pWrapper);
typedef SMsgHandle (*GetMsgHandleFp)(SMgmtWrapper *pWrapper, int32_t msgIndex);
typedef struct SMsgHandle {
RpcMsgFp rpcMsgFp;
......@@ -89,7 +88,6 @@ typedef struct SMgmtFp {
OpenNodeFp openFp;
CloseNodeFp closeFp;
RequireNodeFp requiredFp;
GetMsgHandleFp getMsgHandleFp;
} SMgmtFp;
typedef struct SMgmtWrapper {
......@@ -100,6 +98,7 @@ typedef struct SMgmtWrapper {
SProcObj *pProc;
void *pMgmt;
SDnode *pDnode;
SMsgHandle msgHandles[TDMT_MAX];
SMgmtFp fp;
} SMgmtWrapper;
......@@ -145,6 +144,7 @@ void dndReportStartup(SDnode *pDnode, char *pName, char *pDesc);
void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup);
TdFilePtr dndCheckRunning(char *dataDir);
SMgmtWrapper *dndGetWrapper(SDnode *pDnode, ENodeType nodeType);
void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp);
// dndMonitor.h
void dndSendMonitorReport(SDnode *pDnode);
......
......@@ -26,7 +26,7 @@ int32_t dndInitServer(SDnode *pDnode);
void dndCleanupServer(SDnode *pDnode);
int32_t dndInitClient(SDnode *pDnode);
void dndCleanupClient(SDnode *pDnode);
int32_t dndSetMsgHandle(SDnode *pDnode);
int32_t dndInitMsgHandle(SDnode *pDnode);
int32_t dndSendReqToMnode(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t dndSendReqToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pRpcMsg);
......
......@@ -64,6 +64,14 @@ void dndCleanup() {
SMgmtWrapper *dndGetWrapper(SDnode *pDnode, ENodeType nodeType) { return &pDnode->wrappers[nodeType]; }
void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp) {
SMsgHandle *pHandle = &pWrapper->msgHandles[TMSG_INDEX(msgType)];
pHandle->pWrapper = pWrapper;
pHandle->nodeMsgFp = nodeMsgFp;
pHandle->rpcMsgFp = dndProcessRpcMsg;
}
EDndStatus dndGetStatus(SDnode *pDnode) { return pDnode->status; }
void dndSetStatus(SDnode *pDnode, EDndStatus status) {
......
......@@ -98,7 +98,7 @@ SDnode *dndCreate(SDndCfg *pCfg) {
bmGetMgmtFp(&pDnode->wrappers[BNODE]);
memcpy(&pDnode->cfg, pCfg, sizeof(SDndCfg));
if (dndSetMsgHandle(pDnode) != 0) {
if (dndInitMsgHandle(pDnode) != 0) {
goto _OVER;
}
......
......@@ -246,16 +246,14 @@ void dndCleanupServer(SDnode *pDnode) {
}
}
int32_t dndSetMsgHandle(SDnode *pDnode) {
int32_t dndInitMsgHandle(SDnode *pDnode) {
STransMgmt *pMgmt = &pDnode->trans;
for (ENodeType nodeType = 0; nodeType < NODE_MAX; ++nodeType) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[nodeType];
GetMsgHandleFp getMsgHandleFp = pWrapper->fp.getMsgHandleFp;
if (getMsgHandleFp == NULL) continue;
SMgmtWrapper *pWrapper = &pDnode->wrappers[nodeType];
for (int32_t msgIndex = 0; msgIndex < TDMT_MAX; ++msgIndex) {
SMsgHandle msgHandle = (*getMsgHandleFp)(pWrapper, msgIndex);
SMsgHandle msgHandle = pWrapper->msgHandles[msgIndex];
if (msgHandle.rpcMsgFp == NULL) continue;
SMsgHandle *pHandle = &pMgmt->msgHandles[msgIndex];
......
......@@ -22,9 +22,7 @@
extern "C" {
#endif
void dmInitMsgHandles(SMgmtWrapper *pWrapper);
SMsgHandle dmGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex);
void dmInitMsgHandles(SMgmtWrapper *pWrapper);
void dmSendStatusReq(SDnodeMgmt *pMgmt);
void dmProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg);
int32_t dmProcessConfigReq(SDnode *pDnode, SRpcMsg *pReq);
......
......@@ -38,7 +38,6 @@ typedef struct SDnodeMgmt {
SRWLatch latch;
SDnodeWorker mgmtWorker;
SDnodeWorker statusWorker;
SMsgHandle msgHandles[TDMT_MAX];
const char *path;
SDnode *pDnode;
} SDnodeMgmt;
......
......@@ -114,15 +114,6 @@ void dmProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) {
rpcSendResponse(&rpcRsp);
}
static void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp) {
SDnodeMgmt *pMgmt = pWrapper->pMgmt;
SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)];
pHandle->pWrapper = pWrapper;
pHandle->nodeMsgFp = nodeMsgFp;
pHandle->rpcMsgFp = dndProcessRpcMsg;
}
void dmInitMsgHandles(SMgmtWrapper *pWrapper) {
// Requests handled by DNODE
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE, dmProcessMgmtMsg);
......@@ -147,8 +138,3 @@ void dmInitMsgHandles(SMgmtWrapper *pWrapper) {
dndSetMsgHandle(pWrapper, TDMT_MND_GRANT_RSP, dmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_AUTH_RSP, dmProcessMgmtMsg);
}
SMsgHandle dmGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex) {
SDnodeMgmt *pMgmt = pWrapper->pMgmt;
return pMgmt->msgHandles[msgIndex];
}
......@@ -177,7 +177,6 @@ void dmGetMgmtFp(SMgmtWrapper *pWrapper) {
mgmtFp.openFp = dmInit;
mgmtFp.closeFp = dmCleanup;
mgmtFp.requiredFp = dmRequire;
mgmtFp.getMsgHandleFp = dmGetMsgHandle;
dmInitMsgHandles(pWrapper);
pWrapper->name = "dnode";
......
......@@ -22,9 +22,7 @@
extern "C" {
#endif
void mmInitMsgHandles(SMgmtWrapper *pWrapper);
SMsgHandle mmGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex);
void mmInitMsgHandles(SMgmtWrapper *pWrapper);
int32_t mmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t mmProcessAlterReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t mmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
......
......@@ -38,7 +38,6 @@ typedef struct SMnodeMgmt {
SReplica replicas[TSDB_MAX_REPLICA];
//
SMsgHandle msgHandles[TDMT_MAX];
SProcObj *pProcess;
bool singleProc;
} SMnodeMgmt;
......
......@@ -146,92 +146,78 @@ int32_t mmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {return 0;}
int32_t mmProcessAlterReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {return 0;}
int32_t mmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {return 0;}
static void mmSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)];
pHandle->pWrapper = pWrapper;
pHandle->nodeMsgFp = nodeMsgFp;
pHandle->rpcMsgFp = dndProcessRpcMsg;
}
void mmInitMsgHandles(SMgmtWrapper *pWrapper) {
// Requests handled by DNODE
mmSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE_RSP, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE_RSP, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE_RSP, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE_RSP, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE_RSP, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE_RSP, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE_RSP, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE_RSP, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE_RSP, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE_RSP, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE_RSP, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE_RSP, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE_RSP, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE_RSP, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE_RSP, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE_RSP, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE_RSP, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE_RSP, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE_RSP, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE_RSP, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE_RSP, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE_RSP, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE_RSP, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE_RSP, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE_RSP, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE_RSP, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE_RSP, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE_RSP, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE_RSP, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE_RSP, mmProcessWriteMsg);
// Requests handled by MNODE
mmSetMsgHandle(pWrapper, TDMT_MND_CONNECT, mmProcessReadMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_CREATE_ACCT, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_ALTER_ACCT, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_DROP_ACCT, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_CREATE_USER, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_ALTER_USER, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_DROP_USER, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_GET_USER_AUTH, mmProcessReadMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_CREATE_DNODE, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_CONFIG_DNODE, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_DROP_DNODE, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_CREATE_MNODE, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_DROP_MNODE, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_CREATE_QNODE, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_DROP_QNODE, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_CREATE_SNODE, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_DROP_SNODE, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_CREATE_BNODE, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_DROP_BNODE, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_CREATE_DB, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_DROP_DB, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_USE_DB, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_ALTER_DB, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_SYNC_DB, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_COMPACT_DB, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_CREATE_FUNC, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_RETRIEVE_FUNC, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_DROP_FUNC, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_CREATE_STB, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_ALTER_STB, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_DROP_STB, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_TABLE_META, mmProcessReadMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_VGROUP_LIST, mmProcessReadMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_KILL_QUERY, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_KILL_CONN, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_HEARTBEAT, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_SHOW, mmProcessReadMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_SHOW_RETRIEVE, mmProcessReadMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_STATUS, mmProcessReadMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_KILL_TRANS, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_GRANT, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_AUTH, mmProcessReadMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_CREATE_TOPIC, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_ALTER_TOPIC, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_DROP_TOPIC, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_GET_SUB_EP, mmProcessReadMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_CONNECT, mmProcessReadMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_ACCT, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_ACCT, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_ACCT, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_USER, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_USER, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_USER, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_GET_USER_AUTH, mmProcessReadMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_DNODE, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_CONFIG_DNODE, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_DNODE, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_MNODE, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_MNODE, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_QNODE, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_QNODE, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_SNODE, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_SNODE, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_BNODE, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_BNODE, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_DB, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_DB, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_USE_DB, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_DB, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_SYNC_DB, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_COMPACT_DB, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_FUNC, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_RETRIEVE_FUNC, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_FUNC, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STB, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_STB, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_STB, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_TABLE_META, mmProcessReadMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_VGROUP_LIST, mmProcessReadMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_KILL_QUERY, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_KILL_CONN, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_HEARTBEAT, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_SHOW, mmProcessReadMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_SHOW_RETRIEVE, mmProcessReadMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_STATUS, mmProcessReadMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_KILL_TRANS, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_GRANT, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_AUTH, mmProcessReadMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_TOPIC, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_TOPIC, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_TOPIC, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_GET_SUB_EP, mmProcessReadMsg);
// Requests handled by VNODE
mmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_VND_MQ_REB_RSP, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB_RSP, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB_RSP, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_VND_DROP_STB_RSP, mmProcessWriteMsg);
}
SMsgHandle mmGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
return pMgmt->msgHandles[msgIndex];
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB_RSP, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB_RSP, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB_RSP, mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB_RSP, mmProcessWriteMsg);
}
......@@ -24,7 +24,6 @@ void mmGetMgmtFp(SMgmtWrapper *pWrapper) {
mgmtFp.openFp = NULL;
mgmtFp.closeFp = NULL;
mgmtFp.requiredFp = mmRequireNode;
mgmtFp.getMsgHandleFp = mmGetMsgHandle;
mmInitMsgHandles(pWrapper);
pWrapper->name = "mnode";
......
......@@ -22,9 +22,7 @@
extern "C" {
#endif
void qmInitMsgHandles(SMgmtWrapper *pWrapper);
SMsgHandle qmGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex);
void qmInitMsgHandles(SMgmtWrapper *pWrapper);
int32_t qmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t qmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
......
......@@ -32,7 +32,6 @@ typedef struct SQnodeMgmt {
SDnodeWorker fetchWorker;
//
SMsgHandle msgHandles[TDMT_MAX];
SProcObj *pProcess;
bool singleProc;
} SQnodeMgmt;
......
......@@ -23,7 +23,3 @@ int32_t qmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg){return 0;}
void qmInitMsgHandles(SMgmtWrapper *pWrapper) {
}
SMsgHandle qmGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex) {
SQnodeMgmt *pMgmt = pWrapper->pMgmt;
return pMgmt->msgHandles[msgIndex];
}
......@@ -24,7 +24,6 @@ void qmGetMgmtFp(SMgmtWrapper *pWrapper) {
mgmtFp.openFp = NULL;
mgmtFp.closeFp = NULL;
mgmtFp.requiredFp = qmRequireNode;
mgmtFp.getMsgHandleFp = qmGetMsgHandle;
// qmInitMsgHandles(pWrapper);
pWrapper->name = "qnode";
......
......@@ -22,8 +22,7 @@
extern "C" {
#endif
void smInitMsgHandles(SMgmtWrapper *pWrapper);
SMsgHandle smGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex);
void smInitMsgHandles(SMgmtWrapper *pWrapper);
#ifdef __cplusplus
}
......
......@@ -29,14 +29,10 @@ typedef struct SSnodeMgmt {
SSnode *pSnode;
SRWLatch latch;
SDnodeWorker writeWorker;
//
SMsgHandle msgHandles[TDMT_MAX];
SProcObj *pProcess;
bool singleProc;
SProcObj *pProcess;
bool singleProc;
} SSnodeMgmt;
void smGetMgmtFp(SMgmtWrapper *pMgmt);
int32_t dndInitSnode(SDnode *pDnode);
......
......@@ -22,8 +22,3 @@ int32_t smProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {return 0;}
void smInitMsgHandles(SMgmtWrapper *pWrapper) {
}
SMsgHandle smGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex) {
SSnodeMgmt *pMgmt = pWrapper->pMgmt;
return pMgmt->msgHandles[msgIndex];
}
......@@ -25,7 +25,6 @@ void smGetMgmtFp(SMgmtWrapper *pWrapper) {
mgmtFp.openFp = NULL;
mgmtFp.closeFp = NULL;
mgmtFp.requiredFp = smRequireNode;
mgmtFp.getMsgHandleFp = smGetMsgHandle;
// smInitMsgHandles(pWrapper);
pWrapper->name = "snode";
......
......@@ -22,9 +22,7 @@
extern "C" {
#endif
void vmInitMsgHandles(SMgmtWrapper *pWrapper);
SMsgHandle vmGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex);
void vmInitMsgHandles(SMgmtWrapper *pWrapper);
int32_t vmProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq);
int32_t vmProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq);
int32_t vmProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq);
......
......@@ -43,7 +43,6 @@ typedef struct SVnodesMgmt {
SWWorkerPool syncPool;
SWWorkerPool writePool;
STfs *pTfs;
SMsgHandle msgHandles[TDMT_MAX];
SProcObj *pProcess;
bool singleProc;
} SVnodesMgmt;
......
......@@ -25,51 +25,37 @@ int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *pReq){return 0;}
int32_t vmProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq){return 0;}
int32_t vmProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *pReq){return 0;}
static void vmSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp) {
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)];
pHandle->pWrapper = pWrapper;
pHandle->nodeMsgFp = nodeMsgFp;
pHandle->rpcMsgFp = dndProcessRpcMsg;
}
void vmInitMsgHandles(SMgmtWrapper *pWrapper) {
// Requests handled by VNODE
vmSetMsgHandle(pWrapper, TDMT_VND_SUBMIT, vmProcessWriteMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_QUERY, vmProcessQueryMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, vmProcessQueryMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_FETCH, vmProcessFetchMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, vmProcessFetchMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, vmProcessWriteMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_UPDATE_TAG_VAL, vmProcessWriteMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_TABLE_META, vmProcessFetchMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_TABLES_META, vmProcessFetchMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_MQ_CONSUME, vmProcessQueryMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_MQ_QUERY, vmProcessQueryMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_MQ_CONNECT, vmProcessWriteMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_MQ_DISCONNECT, vmProcessWriteMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, vmProcessWriteMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_RES_READY, vmProcessFetchMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, vmProcessFetchMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, vmProcessFetchMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, vmProcessFetchMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB, vmProcessWriteMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB, vmProcessWriteMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_DROP_STB, vmProcessWriteMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_CREATE_TABLE, vmProcessWriteMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, vmProcessWriteMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_DROP_TABLE, vmProcessWriteMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, vmProcessFetchMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES_FETCH, vmProcessFetchMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, vmProcessWriteMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, vmProcessWriteMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, vmProcessFetchMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_CONSUME, vmProcessFetchMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, vmProcessFetchMsg);
}
SMsgHandle vmGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex) {
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
return pMgmt->msgHandles[msgIndex];
dndSetMsgHandle(pWrapper, TDMT_VND_SUBMIT, vmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, vmProcessQueryMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, vmProcessQueryMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, vmProcessFetchMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, vmProcessFetchMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, vmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_UPDATE_TAG_VAL, vmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_TABLE_META, vmProcessFetchMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_TABLES_META, vmProcessFetchMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONSUME, vmProcessQueryMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_QUERY, vmProcessQueryMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONNECT, vmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_DISCONNECT, vmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, vmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_RES_READY, vmProcessFetchMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, vmProcessFetchMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, vmProcessFetchMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, vmProcessFetchMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB, vmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB, vmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB, vmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_TABLE, vmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, vmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TABLE, vmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, vmProcessFetchMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES_FETCH, vmProcessFetchMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, vmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, vmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, vmProcessFetchMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_CONSUME, vmProcessFetchMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, vmProcessFetchMsg);
}
......@@ -67,7 +67,6 @@ void vmGetMgmtFp(SMgmtWrapper *pWrapper) {
mgmtFp.openFp = vmInit;
mgmtFp.closeFp = vmCleanup;
mgmtFp.requiredFp = vmRequire;
mgmtFp.getMsgHandleFp = vmGetMsgHandle;
vmInitMsgHandles(pWrapper);
pWrapper->name = "vnodes";
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册