提交 4c999f74 编写于 作者: S Shengliang Guan

Enabling qnodes to process messages

上级 4d842cb1
...@@ -89,6 +89,9 @@ typedef struct { ...@@ -89,6 +89,9 @@ typedef struct {
} SDnodeWorker; } SDnodeWorker;
typedef struct SMsgHandle { typedef struct SMsgHandle {
int32_t vgId;
NodeMsgFp vgIdMsgFp;
SMgmtWrapper *pVgIdWrapper; // Handle the case where the same message type is distributed to qnode or vnode
NodeMsgFp msgFp; NodeMsgFp msgFp;
SMgmtWrapper *pWrapper; SMgmtWrapper *pWrapper;
} SMsgHandle; } SMsgHandle;
...@@ -114,6 +117,7 @@ typedef struct SMgmtWrapper { ...@@ -114,6 +117,7 @@ typedef struct SMgmtWrapper {
void *pMgmt; void *pMgmt;
SDnode *pDnode; SDnode *pDnode;
NodeMsgFp msgFps[TDMT_MAX]; NodeMsgFp msgFps[TDMT_MAX];
int32_t msgVgIds[TDMT_MAX]; // Handle the case where the same message type is distributed to qnode or vnode
SMgmtFp fp; SMgmtFp fp;
} SMgmtWrapper; } SMgmtWrapper;
...@@ -149,7 +153,7 @@ typedef struct SDnode { ...@@ -149,7 +153,7 @@ typedef struct SDnode {
EDndStatus dndGetStatus(SDnode *pDnode); EDndStatus dndGetStatus(SDnode *pDnode);
void dndSetStatus(SDnode *pDnode, EDndStatus stat); void dndSetStatus(SDnode *pDnode, EDndStatus stat);
SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType nodeType); SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType nodeType);
void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp); void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp, int32_t vgId);
void dndReportStartup(SDnode *pDnode, char *pName, char *pDesc); void dndReportStartup(SDnode *pDnode, char *pName, char *pDesc);
void dndSendMonitorReport(SDnode *pDnode); void dndSendMonitorReport(SDnode *pDnode);
......
...@@ -65,8 +65,9 @@ void dndCleanup() { ...@@ -65,8 +65,9 @@ void dndCleanup() {
dInfo("dnode env is cleaned up"); dInfo("dnode env is cleaned up");
} }
void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp) { void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp, int32_t vgId) {
pWrapper->msgFps[TMSG_INDEX(msgType)] = nodeMsgFp; pWrapper->msgFps[TMSG_INDEX(msgType)] = nodeMsgFp;
pWrapper->msgVgIds[TMSG_INDEX(msgType)] = vgId;
} }
EDndStatus dndGetStatus(SDnode *pDnode) { return pDnode->status; } EDndStatus dndGetStatus(SDnode *pDnode) { return pDnode->status; }
......
...@@ -20,13 +20,26 @@ ...@@ -20,13 +20,26 @@
#define INTERNAL_CKEY "_key" #define INTERNAL_CKEY "_key"
#define INTERNAL_SECRET "_pwd" #define INTERNAL_SECRET "_pwd"
static inline void dndProcessQVnodeRpcMsg(SMsgHandle *pHandle, SRpcMsg *pMsg, SEpSet *pEpSet) {
SMsgHead *pHead = pMsg->pCont;
int32_t vgId = htonl(pHead->vgId);
SMgmtWrapper *pWrapper = pHandle->pWrapper;
if (vgId == pHandle->vgId && pHandle->pVgIdWrapper != NULL) {
pWrapper = pHandle->pVgIdWrapper;
}
dTrace("msg:%s will be processed by %s, handle:%p app:%p vgId:%d", TMSG_INFO(pMsg->msgType), pWrapper->name,
pMsg->handle, pMsg->ahandle, vgId);
dndProcessRpcMsg(pWrapper, pMsg, pEpSet);
}
static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) {
SDnode *pDnode = parent; SDnode *pDnode = parent;
STransMgmt *pMgmt = &pDnode->trans; STransMgmt *pMgmt = &pDnode->trans;
tmsg_t msgType = pRsp->msgType; tmsg_t msgType = pRsp->msgType;
if (dndGetStatus(pDnode) != DND_STAT_RUNNING) { if (dndGetStatus(pDnode) != DND_STAT_RUNNING) {
// if (pRsp == NULL || pRsp->pCont == NULL) return;
dTrace("rsp:%s ignored since dnode not running, handle:%p app:%p", TMSG_INFO(msgType), pRsp->handle, pRsp->ahandle); dTrace("rsp:%s ignored since dnode not running, handle:%p app:%p", TMSG_INFO(msgType), pRsp->handle, pRsp->ahandle);
rpcFreeCont(pRsp->pCont); rpcFreeCont(pRsp->pCont);
return; return;
...@@ -34,9 +47,13 @@ static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { ...@@ -34,9 +47,13 @@ static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) {
SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)]; SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)];
if (pHandle->msgFp != NULL) { if (pHandle->msgFp != NULL) {
dTrace("rsp:%s will be processed by %s, handle:%p app:%p code:0x%04x:%s", TMSG_INFO(msgType), if (pHandle->vgId == 0) {
pHandle->pWrapper->name, pRsp->handle, pRsp->ahandle, pRsp->code & 0XFFFF, tstrerror(pRsp->code)); dTrace("rsp:%s will be processed by %s, handle:%p app:%p code:0x%04x:%s", TMSG_INFO(msgType),
dndProcessRpcMsg(pHandle->pWrapper, pRsp, pEpSet); pHandle->pWrapper->name, pRsp->handle, pRsp->ahandle, pRsp->code & 0XFFFF, tstrerror(pRsp->code));
dndProcessRpcMsg(pHandle->pWrapper, pRsp, pEpSet);
} else {
dndProcessQVnodeRpcMsg(pHandle, pRsp, pEpSet);
}
} else { } else {
dError("rsp:%s not processed since no handle, handle:%p app:%p", TMSG_INFO(msgType), pRsp->handle, pRsp->ahandle); dError("rsp:%s not processed since no handle, handle:%p app:%p", TMSG_INFO(msgType), pRsp->handle, pRsp->ahandle);
rpcFreeCont(pRsp->pCont); rpcFreeCont(pRsp->pCont);
...@@ -110,9 +127,13 @@ static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) { ...@@ -110,9 +127,13 @@ static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) {
SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)]; SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)];
if (pHandle->msgFp != NULL) { if (pHandle->msgFp != NULL) {
dTrace("req:%s will be processed by %s, handle:%p app:%p", TMSG_INFO(msgType), pHandle->pWrapper->name, if (pHandle->vgId == 0) {
pReq->handle, pReq->ahandle); dTrace("req:%s will be processed by %s, handle:%p app:%p", TMSG_INFO(msgType), pHandle->pWrapper->name,
dndProcessRpcMsg(pHandle->pWrapper, pReq, pEpSet); pReq->handle, pReq->ahandle);
dndProcessRpcMsg(pHandle->pWrapper, pReq, pEpSet);
} else {
dndProcessQVnodeRpcMsg(pHandle, pReq, pEpSet);
}
} else { } else {
dError("req:%s not processed since no handle, handle:%p app:%p", TMSG_INFO(msgType), pReq->handle, pReq->ahandle); dError("req:%s not processed since no handle, handle:%p app:%p", TMSG_INFO(msgType), pReq->handle, pReq->ahandle);
SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED, .ahandle = pReq->ahandle}; SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED, .ahandle = pReq->ahandle};
...@@ -245,17 +266,24 @@ int32_t dndInitMsgHandle(SDnode *pDnode) { ...@@ -245,17 +266,24 @@ int32_t dndInitMsgHandle(SDnode *pDnode) {
for (int32_t msgIndex = 0; msgIndex < TDMT_MAX; ++msgIndex) { for (int32_t msgIndex = 0; msgIndex < TDMT_MAX; ++msgIndex) {
NodeMsgFp msgFp = pWrapper->msgFps[msgIndex]; NodeMsgFp msgFp = pWrapper->msgFps[msgIndex];
int32_t vgId = pWrapper->msgVgIds[msgIndex];
if (msgFp == NULL) continue; if (msgFp == NULL) continue;
SMsgHandle *pHandle = &pMgmt->msgHandles[msgIndex]; SMsgHandle *pHandle = &pMgmt->msgHandles[msgIndex];
if (pHandle->msgFp != NULL) { if (pHandle->msgFp != NULL && pHandle->vgId == vgId) {
dError("msg:%s has multiple process nodes, prev node:%s, curr node:%s", tMsgInfo[msgIndex], dError("msg:%s has multiple process nodes, prev node:%s:%d, curr node:%s:%d", tMsgInfo[msgIndex],
pHandle->pWrapper->name, pWrapper->name); pHandle->pWrapper->name, pHandle->pWrapper->msgVgIds[msgIndex], pWrapper->name, vgId);
return -1; return -1;
} else { } else {
dTrace("msg:%s will be processed by %s", tMsgInfo[msgIndex], pWrapper->name); dTrace("msg:%s will be processed by %s, vgId:%d", tMsgInfo[msgIndex], pWrapper->name, vgId);
pHandle->msgFp = msgFp; if (vgId == 0) {
pHandle->pWrapper = pWrapper; pHandle->msgFp = msgFp;
pHandle->pWrapper = pWrapper;
} else {
pHandle->vgId = vgId;
pHandle->vgIdMsgFp = msgFp;
pHandle->pVgIdWrapper = pWrapper;
}
} }
} }
} }
......
...@@ -114,19 +114,19 @@ int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { ...@@ -114,19 +114,19 @@ int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
void dmInitMsgHandles(SMgmtWrapper *pWrapper) { void dmInitMsgHandles(SMgmtWrapper *pWrapper) {
// Requests handled by DNODE // Requests handled by DNODE
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE, (NodeMsgFp)dmProcessMgmtMsg); dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE, (NodeMsgFp)dmProcessMgmtMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE, (NodeMsgFp)dmProcessMgmtMsg); dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE, (NodeMsgFp)dmProcessMgmtMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE, (NodeMsgFp)dmProcessMgmtMsg); dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE, (NodeMsgFp)dmProcessMgmtMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE, (NodeMsgFp)dmProcessMgmtMsg); dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE, (NodeMsgFp)dmProcessMgmtMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE, (NodeMsgFp)dmProcessMgmtMsg); dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE, (NodeMsgFp)dmProcessMgmtMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE, (NodeMsgFp)dmProcessMgmtMsg); dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE, (NodeMsgFp)dmProcessMgmtMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE, (NodeMsgFp)dmProcessMgmtMsg); dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE, (NodeMsgFp)dmProcessMgmtMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE, (NodeMsgFp)dmProcessMgmtMsg); dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE, (NodeMsgFp)dmProcessMgmtMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE, (NodeMsgFp)dmProcessMgmtMsg); dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE, (NodeMsgFp)dmProcessMgmtMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_NETWORK_TEST, (NodeMsgFp)dmProcessMgmtMsg); dndSetMsgHandle(pWrapper, TDMT_DND_NETWORK_TEST, (NodeMsgFp)dmProcessMgmtMsg, 0);
// Requests handled by MNODE // Requests handled by MNODE
dndSetMsgHandle(pWrapper, TDMT_MND_STATUS_RSP, (NodeMsgFp)dmProcessMgmtMsg); dndSetMsgHandle(pWrapper, TDMT_MND_STATUS_RSP, (NodeMsgFp)dmProcessMgmtMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_GRANT_RSP, (NodeMsgFp)dmProcessMgmtMsg); dndSetMsgHandle(pWrapper, TDMT_MND_GRANT_RSP, (NodeMsgFp)dmProcessMgmtMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_AUTH_RSP, (NodeMsgFp)dmProcessMgmtMsg); dndSetMsgHandle(pWrapper, TDMT_MND_AUTH_RSP, (NodeMsgFp)dmProcessMgmtMsg, 0);
} }
...@@ -75,78 +75,78 @@ int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { ...@@ -75,78 +75,78 @@ int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
void mmInitMsgHandles(SMgmtWrapper *pWrapper) { void mmInitMsgHandles(SMgmtWrapper *pWrapper) {
// Requests handled by DNODE // Requests handled by DNODE
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
// Requests handled by MNODE // Requests handled by MNODE
dndSetMsgHandle(pWrapper, TDMT_MND_CONNECT, (NodeMsgFp)mmProcessReadMsg); dndSetMsgHandle(pWrapper, TDMT_MND_CONNECT, (NodeMsgFp)mmProcessReadMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_ACCT, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_ACCT, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_ACCT, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_ACCT, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_ACCT, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_MND_DROP_ACCT, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_USER, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_USER, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_USER, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_USER, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_USER, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_MND_DROP_USER, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_GET_USER_AUTH, (NodeMsgFp)mmProcessReadMsg); dndSetMsgHandle(pWrapper, TDMT_MND_GET_USER_AUTH, (NodeMsgFp)mmProcessReadMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_DNODE, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_DNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_CONFIG_DNODE, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_MND_CONFIG_DNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_DNODE, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_MND_DROP_DNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_MNODE, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_MNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_MNODE, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_MND_DROP_MNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_QNODE, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_QNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_QNODE, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_MND_DROP_QNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_SNODE, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_SNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_SNODE, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_MND_DROP_SNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_BNODE, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_BNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_BNODE, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_MND_DROP_BNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_DB, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_DB, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_DB, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_MND_DROP_DB, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_USE_DB, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_MND_USE_DB, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_DB, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_DB, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_SYNC_DB, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_MND_SYNC_DB, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_COMPACT_DB, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_MND_COMPACT_DB, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_FUNC, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_FUNC, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_RETRIEVE_FUNC, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_MND_RETRIEVE_FUNC, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_FUNC, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_MND_DROP_FUNC, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STB, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STB, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_STB, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_STB, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_STB, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_MND_DROP_STB, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_TABLE_META, (NodeMsgFp)mmProcessReadMsg); dndSetMsgHandle(pWrapper, TDMT_MND_TABLE_META, (NodeMsgFp)mmProcessReadMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_VGROUP_LIST, (NodeMsgFp)mmProcessReadMsg); dndSetMsgHandle(pWrapper, TDMT_MND_VGROUP_LIST, (NodeMsgFp)mmProcessReadMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_KILL_QUERY, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_MND_KILL_QUERY, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_KILL_CONN, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_MND_KILL_CONN, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_HEARTBEAT, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_MND_HEARTBEAT, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_SHOW, (NodeMsgFp)mmProcessReadMsg); dndSetMsgHandle(pWrapper, TDMT_MND_SHOW, (NodeMsgFp)mmProcessReadMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_SHOW_RETRIEVE, (NodeMsgFp)mmProcessReadMsg); dndSetMsgHandle(pWrapper, TDMT_MND_SHOW_RETRIEVE, (NodeMsgFp)mmProcessReadMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_STATUS, (NodeMsgFp)mmProcessReadMsg); dndSetMsgHandle(pWrapper, TDMT_MND_STATUS, (NodeMsgFp)mmProcessReadMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_KILL_TRANS, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_MND_KILL_TRANS, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_GRANT, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_MND_GRANT, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_AUTH, (NodeMsgFp)mmProcessReadMsg); dndSetMsgHandle(pWrapper, TDMT_MND_AUTH, (NodeMsgFp)mmProcessReadMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_TOPIC, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_TOPIC, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_TOPIC, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_TOPIC, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_TOPIC, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_MND_DROP_TOPIC, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_GET_SUB_EP, (NodeMsgFp)mmProcessReadMsg); dndSetMsgHandle(pWrapper, TDMT_MND_GET_SUB_EP, (NodeMsgFp)mmProcessReadMsg, 0);
// Requests handled by VNODE // Requests handled by VNODE
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB_RSP, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB_RSP, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB_RSP, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB_RSP, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, (NodeMsgFp)mmProcessWriteMsg, 0);
} }
...@@ -54,4 +54,16 @@ int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -54,4 +54,16 @@ int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
} }
} }
void qmInitMsgHandles(SMgmtWrapper *pWrapper) {} void qmInitMsgHandles(SMgmtWrapper *pWrapper) {
// Requests handled by VNODE
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)qmProcessQueryMsg, 1);
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, (NodeMsgFp)qmProcessQueryMsg, 1);
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, (NodeMsgFp)qmProcessFetchMsg, 1);
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, (NodeMsgFp)qmProcessFetchMsg, 1);
dndSetMsgHandle(pWrapper, TDMT_VND_RES_READY, (NodeMsgFp)qmProcessFetchMsg, 1);
dndSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, (NodeMsgFp)qmProcessFetchMsg, 1);
dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, (NodeMsgFp)qmProcessFetchMsg, 1);
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, (NodeMsgFp)qmProcessFetchMsg, 1);
dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, (NodeMsgFp)qmProcessFetchMsg, 1);
}
...@@ -56,6 +56,6 @@ int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -56,6 +56,6 @@ int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
void smInitMsgHandles(SMgmtWrapper *pWrapper) { void smInitMsgHandles(SMgmtWrapper *pWrapper) {
// Requests handled by SNODE // Requests handled by SNODE
dndSetMsgHandle(pWrapper, TDMT_SND_TASK_DEPLOY, (NodeMsgFp)smProcessMgmtMsg); dndSetMsgHandle(pWrapper, TDMT_SND_TASK_DEPLOY, (NodeMsgFp)smProcessMgmtMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_SND_TASK_EXEC, (NodeMsgFp)smProcessExecMsg); dndSetMsgHandle(pWrapper, TDMT_SND_TASK_EXEC, (NodeMsgFp)smProcessExecMsg, 0);
} }
...@@ -243,42 +243,42 @@ int32_t vmProcessCompactVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { ...@@ -243,42 +243,42 @@ int32_t vmProcessCompactVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
void vmInitMsgHandles(SMgmtWrapper *pWrapper) { void vmInitMsgHandles(SMgmtWrapper *pWrapper) {
// Requests handled by VNODE // Requests handled by VNODE
dndSetMsgHandle(pWrapper, TDMT_VND_SUBMIT, (NodeMsgFp)vmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_VND_SUBMIT, (NodeMsgFp)vmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)vmProcessQueryMsg); dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)vmProcessQueryMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, (NodeMsgFp)vmProcessQueryMsg); dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, (NodeMsgFp)vmProcessQueryMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, (NodeMsgFp)vmProcessFetchMsg); dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, (NodeMsgFp)vmProcessFetchMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, (NodeMsgFp)vmProcessFetchMsg); dndSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, (NodeMsgFp)vmProcessFetchMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, (NodeMsgFp)vmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, (NodeMsgFp)vmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_UPDATE_TAG_VAL, (NodeMsgFp)vmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_VND_UPDATE_TAG_VAL, (NodeMsgFp)vmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_TABLE_META, (NodeMsgFp)vmProcessFetchMsg); dndSetMsgHandle(pWrapper, TDMT_VND_TABLE_META, (NodeMsgFp)vmProcessFetchMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_TABLES_META, (NodeMsgFp)vmProcessFetchMsg); dndSetMsgHandle(pWrapper, TDMT_VND_TABLES_META, (NodeMsgFp)vmProcessFetchMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONSUME, (NodeMsgFp)vmProcessQueryMsg); dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONSUME, (NodeMsgFp)vmProcessQueryMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_QUERY, (NodeMsgFp)vmProcessQueryMsg); dndSetMsgHandle(pWrapper, TDMT_VND_MQ_QUERY, (NodeMsgFp)vmProcessQueryMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONNECT, (NodeMsgFp)vmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONNECT, (NodeMsgFp)vmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_DISCONNECT, (NodeMsgFp)vmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_VND_MQ_DISCONNECT, (NodeMsgFp)vmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_RES_READY, (NodeMsgFp)vmProcessFetchMsg); dndSetMsgHandle(pWrapper, TDMT_VND_RES_READY, (NodeMsgFp)vmProcessFetchMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, (NodeMsgFp)vmProcessFetchMsg); dndSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, (NodeMsgFp)vmProcessFetchMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, (NodeMsgFp)vmProcessFetchMsg); dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, (NodeMsgFp)vmProcessFetchMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, (NodeMsgFp)vmProcessFetchMsg); dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, (NodeMsgFp)vmProcessFetchMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB, (NodeMsgFp)vmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB, (NodeMsgFp)vmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB, (NodeMsgFp)vmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB, (NodeMsgFp)vmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB, (NodeMsgFp)vmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB, (NodeMsgFp)vmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_TABLE, (NodeMsgFp)vmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_TABLE, (NodeMsgFp)vmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, (NodeMsgFp)vmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, (NodeMsgFp)vmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TABLE, (NodeMsgFp)vmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TABLE, (NodeMsgFp)vmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, (NodeMsgFp)vmProcessFetchMsg); dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, (NodeMsgFp)vmProcessFetchMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES_FETCH, (NodeMsgFp)vmProcessFetchMsg); dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES_FETCH, (NodeMsgFp)vmProcessFetchMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, (NodeMsgFp)vmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, (NodeMsgFp)vmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, (NodeMsgFp)vmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, (NodeMsgFp)vmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessFetchMsg); dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessFetchMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_CONSUME, (NodeMsgFp)vmProcessFetchMsg); dndSetMsgHandle(pWrapper, TDMT_VND_CONSUME, (NodeMsgFp)vmProcessFetchMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)vmProcessFetchMsg); dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)vmProcessFetchMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_EXEC, (NodeMsgFp)vmProcessFetchMsg); dndSetMsgHandle(pWrapper, TDMT_VND_TASK_EXEC, (NodeMsgFp)vmProcessFetchMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, (NodeMsgFp)vmProcessMgmtMsg); dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, (NodeMsgFp)vmProcessMgmtMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, (NodeMsgFp)vmProcessMgmtMsg); dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, (NodeMsgFp)vmProcessMgmtMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, (NodeMsgFp)vmProcessMgmtMsg); dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, (NodeMsgFp)vmProcessMgmtMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE, (NodeMsgFp)vmProcessMgmtMsg); dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE, (NodeMsgFp)vmProcessMgmtMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE, (NodeMsgFp)vmProcessMgmtMsg); dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE, (NodeMsgFp)vmProcessMgmtMsg, 0);
} }
...@@ -162,7 +162,7 @@ static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueTyp ...@@ -162,7 +162,7 @@ static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueTyp
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
if (pVnode == NULL) { if (pVnode == NULL) {
dError("vgId:%d, failed to write msg:%p to queue since %s", pHead->vgId, pMsg, terrstr()); dError("vgId:%d, failed to write msg:%p to vnode-queue since %s", pHead->vgId, pMsg, terrstr());
return -1; return -1;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册