提交 d78c54fb 编写于 作者: S Shengliang Guan

TD-1915

上级 e4e86609
...@@ -124,13 +124,12 @@ void dnodeFreeMPeerQueue() { ...@@ -124,13 +124,12 @@ void dnodeFreeMPeerQueue() {
void dnodeDispatchToMPeerQueue(SRpcMsg *pMsg) { void dnodeDispatchToMPeerQueue(SRpcMsg *pMsg) {
if (!mnodeIsRunning() || tsMPeerQueue == NULL) { if (!mnodeIsRunning() || tsMPeerQueue == NULL) {
dnodeSendRedirectMsg(pMsg, false); dnodeSendRedirectMsg(pMsg, false);
rpcFreeCont(pMsg->pCont); } else {
return; SMnodeMsg *pPeer = mnodeCreateMsg(pMsg);
taosWriteQitem(tsMPeerQueue, TAOS_QTYPE_RPC, pPeer);
} }
SMnodeMsg *pPeer = taosAllocateQitem(sizeof(SMnodeMsg)); rpcFreeCont(pMsg->pCont);
mnodeCreateMsg(pPeer, pMsg);
taosWriteQitem(tsMPeerQueue, TAOS_QTYPE_RPC, pPeer);
} }
static void dnodeFreeMPeerMsg(SMnodeMsg *pPeer) { static void dnodeFreeMPeerMsg(SMnodeMsg *pPeer) {
......
...@@ -125,13 +125,12 @@ void dnodeFreeMReadQueue() { ...@@ -125,13 +125,12 @@ void dnodeFreeMReadQueue() {
void dnodeDispatchToMReadQueue(SRpcMsg *pMsg) { void dnodeDispatchToMReadQueue(SRpcMsg *pMsg) {
if (!mnodeIsRunning() || tsMReadQueue == NULL) { if (!mnodeIsRunning() || tsMReadQueue == NULL) {
dnodeSendRedirectMsg(pMsg, true); dnodeSendRedirectMsg(pMsg, true);
rpcFreeCont(pMsg->pCont); } else {
return; SMnodeMsg *pRead = mnodeCreateMsg(pMsg);
taosWriteQitem(tsMReadQueue, TAOS_QTYPE_RPC, pRead);
} }
SMnodeMsg *pRead = taosAllocateQitem(sizeof(SMnodeMsg)); rpcFreeCont(pMsg->pCont);
mnodeCreateMsg(pRead, pMsg);
taosWriteQitem(tsMReadQueue, TAOS_QTYPE_RPC, pRead);
} }
static void dnodeFreeMReadMsg(SMnodeMsg *pRead) { static void dnodeFreeMReadMsg(SMnodeMsg *pRead) {
......
...@@ -125,16 +125,14 @@ void dnodeFreeMWritequeue() { ...@@ -125,16 +125,14 @@ void dnodeFreeMWritequeue() {
void dnodeDispatchToMWriteQueue(SRpcMsg *pMsg) { void dnodeDispatchToMWriteQueue(SRpcMsg *pMsg) {
if (!mnodeIsRunning() || tsMWriteQueue == NULL) { if (!mnodeIsRunning() || tsMWriteQueue == NULL) {
dnodeSendRedirectMsg(pMsg, true); dnodeSendRedirectMsg(pMsg, true);
rpcFreeCont(pMsg->pCont); } else {
return; SMnodeMsg *pWrite = mnodeCreateMsg(pMsg);
}
SMnodeMsg *pWrite = taosAllocateQitem(sizeof(SMnodeMsg));
mnodeCreateMsg(pWrite, pMsg);
dDebug("app:%p:%p, msg:%s is put into mwrite queue:%p", pWrite->rpcMsg.ahandle, pWrite, dDebug("app:%p:%p, msg:%s is put into mwrite queue:%p", pWrite->rpcMsg.ahandle, pWrite,
taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue); taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue);
taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite); taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite);
}
rpcFreeCont(pMsg->pCont);
} }
static void dnodeFreeMWriteMsg(SMnodeMsg *pWrite) { static void dnodeFreeMWriteMsg(SMnodeMsg *pWrite) {
......
...@@ -47,6 +47,11 @@ typedef struct { ...@@ -47,6 +47,11 @@ typedef struct {
int32_t * vnodeList; int32_t * vnodeList;
} SOpenVnodeThread; } SOpenVnodeThread;
typedef struct {
SRpcMsg rpcMsg;
char pCont[];
} SMgmtMsg;
void * tsDnodeTmr = NULL; void * tsDnodeTmr = NULL;
static void * tsStatusTimer = NULL; static void * tsStatusTimer = NULL;
static uint32_t tsRebootTime; static uint32_t tsRebootTime;
...@@ -172,38 +177,46 @@ void dnodeCleanupMgmt() { ...@@ -172,38 +177,46 @@ void dnodeCleanupMgmt() {
vnodeCleanupResources(); vnodeCleanupResources();
} }
void dnodeDispatchToMgmtQueue(SRpcMsg *pMsg) { static int32_t dnodeWriteToMgmtQueue(SRpcMsg *pMsg) {
void *item; int32_t size = sizeof(SMgmtMsg) + pMsg->contLen;
SMgmtMsg *pMgmt = taosAllocateQitem(size);
if (pMgmt == NULL) {
return TSDB_CODE_DND_OUT_OF_MEMORY;
}
item = taosAllocateQitem(sizeof(SRpcMsg)); pMgmt->rpcMsg = *pMsg;
if (item) { pMgmt->rpcMsg.pCont = pMgmt->pCont;
memcpy(item, pMsg, sizeof(SRpcMsg)); memcpy(pMgmt->pCont, pMsg->pCont, pMsg->contLen);
taosWriteQitem(tsMgmtQueue, 1, item); taosWriteQitem(tsMgmtQueue, TAOS_QTYPE_RPC, pMgmt);
} else {
SRpcMsg rsp = {
.handle = pMsg->handle,
.pCont = NULL,
.code = TSDB_CODE_DND_OUT_OF_MEMORY
};
return TSDB_CODE_SUCCESS;
}
void dnodeDispatchToMgmtQueue(SRpcMsg *pMsg) {
int32_t code = dnodeWriteToMgmtQueue(pMsg);
if (code != TSDB_CODE_SUCCESS) {
SRpcMsg rsp = {.handle = pMsg->handle, .code = code};
rpcSendResponse(&rsp); rpcSendResponse(&rsp);
rpcFreeCont(pMsg->pCont);
} }
rpcFreeCont(pMsg->pCont);
} }
static void *dnodeProcessMgmtQueue(void *param) { static void *dnodeProcessMgmtQueue(void *param) {
SRpcMsg *pMsg; SMgmtMsg *pMgmt;
SRpcMsg * pMsg;
SRpcMsg rsp = {0}; SRpcMsg rsp = {0};
int type; int32_t qtype;
void * handle; void * handle;
while (1) { while (1) {
if (taosReadQitemFromQset(tsMgmtQset, &type, (void **) &pMsg, &handle) == 0) { if (taosReadQitemFromQset(tsMgmtQset, &qtype, (void **)&pMgmt, &handle) == 0) {
dDebug("qset:%p, dnode mgmt got no message from qset, exit", tsMgmtQset); dDebug("qset:%p, dnode mgmt got no message from qset, exit", tsMgmtQset);
break; break;
} }
dDebug("%p, msg:%s will be processed", pMsg->ahandle, taosMsg[pMsg->msgType]); pMsg = &pMgmt->rpcMsg;
dDebug("%p, msg:%p:%s will be processed", pMsg->ahandle, pMgmt, taosMsg[pMsg->msgType]);
if (dnodeProcessMgmtMsgFp[pMsg->msgType]) { if (dnodeProcessMgmtMsgFp[pMsg->msgType]) {
rsp.code = (*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg); rsp.code = (*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg);
} else { } else {
...@@ -214,7 +227,6 @@ static void *dnodeProcessMgmtQueue(void *param) { ...@@ -214,7 +227,6 @@ static void *dnodeProcessMgmtQueue(void *param) {
rsp.pCont = NULL; rsp.pCont = NULL;
rpcSendResponse(&rsp); rpcSendResponse(&rsp);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
......
...@@ -50,9 +50,10 @@ typedef struct SMnodeMsg { ...@@ -50,9 +50,10 @@ typedef struct SMnodeMsg {
int32_t code; int32_t code;
void * pObj; void * pObj;
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
char pCont[];
} SMnodeMsg; } SMnodeMsg;
void mnodeCreateMsg(SMnodeMsg *pMsg, SRpcMsg *rpcMsg); void * mnodeCreateMsg(SRpcMsg *pRpcMsg);
int32_t mnodeInitMsg(SMnodeMsg *pMsg); int32_t mnodeInitMsg(SMnodeMsg *pMsg);
void mnodeCleanupMsg(SMnodeMsg *pMsg); void mnodeCleanupMsg(SMnodeMsg *pMsg);
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
#include "taosmsg.h" #include "taosmsg.h"
#include "taoserror.h" #include "taoserror.h"
#include "trpc.h" #include "trpc.h"
#include "tcache.h" #include "tqueue.h"
#include "mnode.h" #include "mnode.h"
#include "dnode.h" #include "dnode.h"
#include "mnodeDef.h" #include "mnodeDef.h"
...@@ -34,8 +34,15 @@ ...@@ -34,8 +34,15 @@
#include "mnodeUser.h" #include "mnodeUser.h"
#include "mnodeVgroup.h" #include "mnodeVgroup.h"
void mnodeCreateMsg(SMnodeMsg *pMsg, SRpcMsg *rpcMsg) { void *mnodeCreateMsg(SRpcMsg *pRpcMsg) {
pMsg->rpcMsg = *rpcMsg; int32_t size = sizeof(SMnodeMsg) + pRpcMsg->contLen;
SMnodeMsg *pMsg = taosAllocateQitem(size);
pMsg->rpcMsg = *pRpcMsg;
pMsg->rpcMsg.pCont = pMsg->pCont;
memcpy(pMsg->pCont, pRpcMsg->pCont, pRpcMsg->contLen);
return pMsg;
} }
int32_t mnodeInitMsg(SMnodeMsg *pMsg) { int32_t mnodeInitMsg(SMnodeMsg *pMsg) {
...@@ -54,7 +61,6 @@ int32_t mnodeInitMsg(SMnodeMsg *pMsg) { ...@@ -54,7 +61,6 @@ int32_t mnodeInitMsg(SMnodeMsg *pMsg) {
void mnodeCleanupMsg(SMnodeMsg *pMsg) { void mnodeCleanupMsg(SMnodeMsg *pMsg) {
if (pMsg != NULL) { if (pMsg != NULL) {
if (pMsg->rpcMsg.pCont) rpcFreeCont(pMsg->rpcMsg.pCont);
if (pMsg->pUser) mnodeDecUserRef(pMsg->pUser); if (pMsg->pUser) mnodeDecUserRef(pMsg->pUser);
if (pMsg->pDb) mnodeDecDbRef(pMsg->pDb); if (pMsg->pDb) mnodeDecDbRef(pMsg->pDb);
if (pMsg->pVgroup) mnodeDecVgroupRef(pMsg->pVgroup); if (pMsg->pVgroup) mnodeDecVgroupRef(pMsg->pVgroup);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册