提交 17215ffe 编写于 作者: S Shengliang Guan

shm

上级 718e3994
...@@ -161,7 +161,6 @@ int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg) { ...@@ -161,7 +161,6 @@ int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg) {
return dndProcessCreateNodeMsg(pDnode, BNODE, pMsg); return dndProcessCreateNodeMsg(pDnode, BNODE, pMsg);
case TDMT_DND_DROP_BNODE: case TDMT_DND_DROP_BNODE:
return dndProcessDropNodeMsg(pDnode, BNODE, pMsg); return dndProcessDropNodeMsg(pDnode, BNODE, pMsg);
default: default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED; terrno = TSDB_CODE_MSG_NOT_PROCESSED;
return -1; return -1;
......
...@@ -83,7 +83,6 @@ static void dmProcessQueue(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { ...@@ -83,7 +83,6 @@ static void dmProcessQueue(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
break; break;
default: default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED; terrno = TSDB_CODE_MSG_NOT_PROCESSED;
code = -1;
dError("msg:%p, type:%s not processed in dnode queue", pRpc->handle, TMSG_INFO(pRpc->msgType)); dError("msg:%p, type:%s not processed in dnode queue", pRpc->handle, TMSG_INFO(pRpc->msgType));
} }
......
...@@ -16,9 +16,15 @@ ...@@ -16,9 +16,15 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "vmInt.h" #include "vmInt.h"
static void vmProcessQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) { vnodeProcessQueryMsg(pVnode->pImpl, pMsg); } static void vmProcessQueryQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) {
dTrace("msg:%p, will be processed in vnode query queue", pMsg);
vnodeProcessQueryMsg(pVnode->pImpl, &pMsg->rpcMsg);
}
static void vmProcessFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) { vnodeProcessFetchMsg(pVnode->pImpl, pMsg); } static void vmProcessFetchQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) {
dTrace("msg:%p, will be processed in vnode fetch queue", pMsg);
vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg);
}
static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) { static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *)); SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *));
...@@ -60,43 +66,29 @@ static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numO ...@@ -60,43 +66,29 @@ static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numO
} }
static void vmProcessApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) { static void vmProcessApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
SRpcMsg *pMsg = NULL; SNodeMsg *pMsg = NULL;
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pMsg); taosGetQitem(qall, (void **)&pMsg);
// todo // todo
SRpcMsg *pRsp = NULL; SRpcMsg *pRsp = NULL;
(void)vnodeApplyWMsg(pVnode->pImpl, pMsg, &pRsp); (void)vnodeApplyWMsg(pVnode->pImpl, &pMsg->rpcMsg, &pRsp);
} }
} }
static void vmProcessSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) { static void vmProcessSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
SRpcMsg *pMsg = NULL; SNodeMsg *pMsg = NULL;
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pMsg); taosGetQitem(qall, (void **)&pMsg);
// todo // todo
SRpcMsg *pRsp = NULL; SRpcMsg *pRsp = NULL;
(void)vnodeProcessSyncReq(pVnode->pImpl, pMsg, &pRsp); (void)vnodeProcessSyncReq(pVnode->pImpl, &pMsg->rpcMsg, &pRsp);
} }
} }
static int32_t vmWriteMsgToQueue(STaosQueue *pQueue, SNodeMsg *pMsg, bool sendRsp) {
int32_t code = taosWriteQitem(pQueue, pMsg);
if (code != TSDB_CODE_SUCCESS && sendRsp) {
if (pMsg->rpcMsg.msgType & 1u) {
SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, .code = code};
rpcSendResponse(&rsp);
}
rpcFreeCont(pMsg->rpcMsg.pCont);
}
return code;
}
static SVnodeObj *vmAcquireFromMsg(SVnodesMgmt *pMgmt, SNodeMsg *pNodeMsg) { static SVnodeObj *vmAcquireFromMsg(SVnodesMgmt *pMgmt, SNodeMsg *pNodeMsg) {
SRpcMsg *pMsg = &pNodeMsg->rpcMsg; SRpcMsg *pMsg = &pNodeMsg->rpcMsg;
...@@ -107,11 +99,6 @@ static SVnodeObj *vmAcquireFromMsg(SVnodesMgmt *pMgmt, SNodeMsg *pNodeMsg) { ...@@ -107,11 +99,6 @@ static SVnodeObj *vmAcquireFromMsg(SVnodesMgmt *pMgmt, SNodeMsg *pNodeMsg) {
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
if (pVnode == NULL) { if (pVnode == NULL) {
dError("vgId:%d, failed to acquire vnode while process req", pHead->vgId); dError("vgId:%d, failed to acquire vnode while process req", pHead->vgId);
if (pMsg->msgType & 1u) {
SRpcMsg rsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID};
rpcSendResponse(&rsp);
}
rpcFreeCont(pMsg->pCont);
} }
return pVnode; return pVnode;
...@@ -119,34 +106,38 @@ static SVnodeObj *vmAcquireFromMsg(SVnodesMgmt *pMgmt, SNodeMsg *pNodeMsg) { ...@@ -119,34 +106,38 @@ static SVnodeObj *vmAcquireFromMsg(SVnodesMgmt *pMgmt, SNodeMsg *pNodeMsg) {
int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
SVnodeObj *pVnode = vmAcquireFromMsg(pMgmt, pMsg); SVnodeObj *pVnode = vmAcquireFromMsg(pMgmt, pMsg);
if (pVnode != NULL) { if (pVnode == NULL) return -1;
(void)vmWriteMsgToQueue(pVnode->pWriteQ, pMsg, true);
int32_t code = taosWriteQitem(pVnode->pWriteQ, pMsg);
vmReleaseVnode(pMgmt, pVnode); vmReleaseVnode(pMgmt, pVnode);
} return code;
} }
int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
SVnodeObj *pVnode = vmAcquireFromMsg(pMgmt, pMsg); SVnodeObj *pVnode = vmAcquireFromMsg(pMgmt, pMsg);
if (pVnode != NULL) { if (pVnode == NULL) return -1;
(void)vmWriteMsgToQueue(pVnode->pSyncQ, pMsg, true);
int32_t code = taosWriteQitem(pVnode->pSyncQ, pMsg);
vmReleaseVnode(pMgmt, pVnode); vmReleaseVnode(pMgmt, pVnode);
} return code;
} }
int32_t vmProcessQueryMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { int32_t vmProcessQueryMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
SVnodeObj *pVnode = vmAcquireFromMsg(pMgmt, pMsg); SVnodeObj *pVnode = vmAcquireFromMsg(pMgmt, pMsg);
if (pVnode != NULL) { if (pVnode == NULL) return -1;
(void)vmWriteMsgToQueue(pVnode->pQueryQ, pMsg, true);
int32_t code = taosWriteQitem(pVnode->pQueryQ, pMsg);
vmReleaseVnode(pMgmt, pVnode); vmReleaseVnode(pMgmt, pVnode);
} return code;
} }
int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
SVnodeObj *pVnode = vmAcquireFromMsg(pMgmt, pMsg); SVnodeObj *pVnode = vmAcquireFromMsg(pMgmt, pMsg);
if (pVnode != NULL) { if (pVnode == NULL) return -1;
(void)vmWriteMsgToQueue(pVnode->pFetchQ, pMsg, true);
int32_t code = taosWriteQitem(pVnode->pFetchQ, pMsg);
vmReleaseVnode(pMgmt, pVnode); vmReleaseVnode(pMgmt, pVnode);
} return code;
} }
int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
...@@ -161,7 +152,8 @@ int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { ...@@ -161,7 +152,8 @@ int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg));
if (pMsg != NULL) { if (pMsg != NULL) {
code = vmWriteMsgToQueue(pVnode->pQueryQ, pMsg, false); pMsg->rpcMsg = *pRpc;
code = taosWriteQitem(pVnode->pQueryQ, pMsg);
} }
vmReleaseVnode(pMgmt, pVnode); vmReleaseVnode(pMgmt, pVnode);
return code; return code;
...@@ -179,7 +171,8 @@ int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, int32_t vgId, SRpcMsg *pRpc ...@@ -179,7 +171,8 @@ int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, int32_t vgId, SRpcMsg *pRpc
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg));
if (pMsg != NULL) { if (pMsg != NULL) {
code = vmWriteMsgToQueue(pVnode->pApplyQ, pMsg, false); pMsg->rpcMsg = *pRpc;
code = taosWriteQitem(pVnode->pApplyQ, pMsg);
} }
vmReleaseVnode(pMgmt, pVnode); vmReleaseVnode(pMgmt, pVnode);
return code; return code;
...@@ -217,7 +210,7 @@ void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { ...@@ -217,7 +210,7 @@ void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
int32_t code = -1; int32_t code = -1;
tmsg_t msgType = pMsg->rpcMsg.msgType; tmsg_t msgType = pMsg->rpcMsg.msgType;
dTrace("msg:%p, will be processed", pMsg); dTrace("msg:%p, will be processed in vnode mgmt queue", pMsg);
switch (msgType) { switch (msgType) {
case TDMT_DND_CREATE_VNODE: case TDMT_DND_CREATE_VNODE:
...@@ -237,9 +230,7 @@ static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { ...@@ -237,9 +230,7 @@ static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
break; break;
default: default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED; terrno = TSDB_CODE_MSG_NOT_PROCESSED;
code = -1; dError("msg:%p, not processed in mgmt queue", pMsg);
dError("RPC %p, dnode msg:%s not processed", pMsg->rpcMsg.handle, TMSG_INFO(msgType));
break;
} }
if (msgType & 1u) { if (msgType & 1u) {
...@@ -248,10 +239,9 @@ static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { ...@@ -248,10 +239,9 @@ static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
rpcSendResponse(&rsp); rpcSendResponse(&rsp);
} }
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
rpcFreeCont(pMsg->rpcMsg.pCont); rpcFreeCont(pMsg->rpcMsg.pCont);
pMsg->rpcMsg.pCont = NULL;
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
dTrace("msg:%p, is freed", pMsg);
} }
int32_t vmStartWorker(SVnodesMgmt *pMgmt) { int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
......
...@@ -17,17 +17,19 @@ ...@@ -17,17 +17,19 @@
#include "vnd.h" #include "vnd.h"
int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
SRpcMsg *pMsg; SNodeMsg *pMsg;
SRpcMsg *pRpc;
for (int i = 0; i < taosArrayGetSize(pMsgs); i++) { for (int i = 0; i < taosArrayGetSize(pMsgs); i++) {
pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i); pMsg = *(SNodeMsg **)taosArrayGet(pMsgs, i);
pRpc = &pMsg->rpcMsg;
// set request version // set request version
void *pBuf = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); void *pBuf = POINTER_SHIFT(pRpc->pCont, sizeof(SMsgHead));
int64_t ver = pVnode->state.processed++; int64_t ver = pVnode->state.processed++;
taosEncodeFixedI64(&pBuf, ver); taosEncodeFixedI64(&pBuf, ver);
if (walWrite(pVnode->pWal, ver, pMsg->msgType, pMsg->pCont, pMsg->contLen) < 0) { if (walWrite(pVnode->pWal, ver, pRpc->msgType, pRpc->pCont, pRpc->contLen) < 0) {
// TODO: handle error // TODO: handle error
/*ASSERT(false);*/ /*ASSERT(false);*/
vError("vnode:%d write wal error since %s", pVnode->vgId, terrstr()); vError("vnode:%d write wal error since %s", pVnode->vgId, terrstr());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册