提交 08b087ec 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

add a queue for mgmt message from mnode

上级 72e8540c
...@@ -22,7 +22,7 @@ extern "C" { ...@@ -22,7 +22,7 @@ extern "C" {
int32_t dnodeInitMgmt(); int32_t dnodeInitMgmt();
void dnodeCleanupMgmt(); void dnodeCleanupMgmt();
void dnodeDispatchToDnodeMgmt(SRpcMsg *rpcMsg); void dnodeDispatchToMgmtQueue(SRpcMsg *rpcMsg);
void* dnodeGetVnode(int32_t vgId); void* dnodeGetVnode(int32_t vgId);
int32_t dnodeGetVnodeStatus(void *pVnode); int32_t dnodeGetVnodeStatus(void *pVnode);
......
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#include "ttimer.h" #include "ttimer.h"
#include "tsdb.h" #include "tsdb.h"
#include "twal.h" #include "twal.h"
#include "tqueue.h"
#include "tsync.h" #include "tsync.h"
#include "ttime.h" #include "ttime.h"
#include "ttimer.h" #include "ttimer.h"
...@@ -46,6 +47,9 @@ static SRpcIpSet tsDMnodeIpSetForPeer = {0}; ...@@ -46,6 +47,9 @@ static SRpcIpSet tsDMnodeIpSetForPeer = {0};
static SRpcIpSet tsDMnodeIpSetForShell = {0}; static SRpcIpSet tsDMnodeIpSetForShell = {0};
static SDMMnodeInfos tsDMnodeInfos = {0}; static SDMMnodeInfos tsDMnodeInfos = {0};
static SDMDnodeCfg tsDnodeCfg = {0}; static SDMDnodeCfg tsDnodeCfg = {0};
static taos_qset tsMgmtQset = NULL;
static taos_queue tsMgmtQueue = NULL;
static pthread_t tsQthread;
static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes); static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes);
static bool dnodeReadMnodeInfos(); static bool dnodeReadMnodeInfos();
...@@ -55,6 +59,7 @@ static bool dnodeReadDnodeCfg(); ...@@ -55,6 +59,7 @@ static bool dnodeReadDnodeCfg();
static void dnodeSaveDnodeCfg(); static void dnodeSaveDnodeCfg();
static void dnodeProcessStatusRsp(SRpcMsg *pMsg); static void dnodeProcessStatusRsp(SRpcMsg *pMsg);
static void dnodeSendStatusMsg(void *handle, void *tmrId); static void dnodeSendStatusMsg(void *handle, void *tmrId);
static void *dnodeProcessMgmtQueue(void *param);
static int32_t dnodeOpenVnodes(); static int32_t dnodeOpenVnodes();
static void dnodeCloseVnodes(); static void dnodeCloseVnodes();
...@@ -74,12 +79,6 @@ int32_t dnodeInitMgmt() { ...@@ -74,12 +79,6 @@ int32_t dnodeInitMgmt() {
dnodeReadDnodeCfg(); dnodeReadDnodeCfg();
tsRebootTime = taosGetTimestampSec(); tsRebootTime = taosGetTimestampSec();
tsDnodeTmr = taosTmrInit(100, 200, 60000, "DND-DM");
if (tsDnodeTmr == NULL) {
dError("failed to init dnode timer");
return -1;
}
if (!dnodeReadMnodeInfos()) { if (!dnodeReadMnodeInfos()) {
memset(&tsDMnodeIpSetForPeer, 0, sizeof(SRpcIpSet)); memset(&tsDMnodeIpSetForPeer, 0, sizeof(SRpcIpSet));
memset(&tsDMnodeIpSetForShell, 0, sizeof(SRpcIpSet)); memset(&tsDMnodeIpSetForShell, 0, sizeof(SRpcIpSet));
...@@ -118,8 +117,45 @@ int32_t dnodeInitMgmt() { ...@@ -118,8 +117,45 @@ int32_t dnodeInitMgmt() {
} }
} }
int32_t code = dnodeOpenVnodes(); // create the queue and thread to handle the message
tsMgmtQset = taosOpenQset();
if (tsMgmtQset == NULL) {
dError("failed to create the mgmt queue set");
dnodeCleanupMgmt();
return -1;
}
tsMgmtQueue = taosOpenQueue();
if (tsMgmtQueue == NULL) {
dError("failed to create the mgmt queue");
dnodeCleanupMgmt();
return -1;
}
taosAddIntoQset(tsMgmtQset, tsMgmtQueue, NULL);
pthread_attr_t thAttr;
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
int32_t code = pthread_create(&tsQthread, &thAttr, dnodeProcessMgmtQueue, NULL);
pthread_attr_destroy(&thAttr);
if (code != 0) {
dError("failed to create thread to process mgmt queue, reason:%s", strerror(errno));
dnodeCleanupMgmt();
return -1;
}
code = dnodeOpenVnodes();
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
dnodeCleanupMgmt();
return -1;
}
tsDnodeTmr = taosTmrInit(100, 200, 60000, "DND-DM");
if (tsDnodeTmr == NULL) {
dError("failed to init dnode timer");
dnodeCleanupMgmt();
return -1; return -1;
} }
...@@ -142,22 +178,54 @@ void dnodeCleanupMgmt() { ...@@ -142,22 +178,54 @@ void dnodeCleanupMgmt() {
} }
dnodeCloseVnodes(); dnodeCloseVnodes();
if (tsMgmtQset) taosQsetThreadResume(tsMgmtQset);
if (tsQthread) pthread_join(tsQthread, NULL);
if (tsMgmtQueue) taosCloseQueue(tsMgmtQueue);
if (tsMgmtQset) taosCloseQset(tsMgmtQset);
tsMgmtQset = NULL;
tsMgmtQueue = NULL;
} }
void dnodeDispatchToDnodeMgmt(SRpcMsg *pMsg) { void dnodeDispatchToMgmtQueue(SRpcMsg *pMsg) {
SRpcMsg rsp; void *item;
if (dnodeProcessMgmtMsgFp[pMsg->msgType]) { item = taosAllocateQitem(sizeof(SRpcMsg));
rsp.code = (*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg); memcpy(item, pMsg, sizeof(SRpcMsg));
} else {
rsp.code = TSDB_CODE_MSG_NOT_PROCESSED; taosWriteQitem(tsMgmtQueue, 1, item);
} }
static void *dnodeProcessMgmtQueue(void *param) {
SRpcMsg *pMsg;
SRpcMsg rsp;
int type;
void *handle;
rsp.handle = pMsg->handle; while (1) {
rsp.pCont = NULL; if (taosReadQitemFromQset(tsMgmtQset, &type, (void **) &pMsg, &handle) == 0) {
rpcSendResponse(&rsp); dTrace("dnode mgmt got no message from qset, exit ...");
break;
}
dTrace("%p, msg:%s will be processed", pMsg->ahandle, taosMsg[pMsg->msgType]);
if (dnodeProcessMgmtMsgFp[pMsg->msgType]) {
rsp.code = (*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg);
} else {
rsp.code = TSDB_CODE_MSG_NOT_PROCESSED;
}
rsp.handle = pMsg->handle;
rsp.pCont = NULL;
rpcSendResponse(&rsp);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
rpcFreeCont(pMsg->pCont); return NULL;
} }
static int32_t dnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) { static int32_t dnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) {
......
...@@ -43,10 +43,10 @@ int32_t dnodeInitServer() { ...@@ -43,10 +43,10 @@ int32_t dnodeInitServer() {
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeDispatchToVnodeWriteQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeDispatchToVnodeWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeDispatchToVnodeWriteQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeDispatchToVnodeWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeDispatchToDnodeMgmt; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeDispatchToMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeDispatchToDnodeMgmt; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeDispatchToMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeDispatchToDnodeMgmt; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeDispatchToMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToDnodeMgmt; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = dnodeDispatchToMnodePeerQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = dnodeDispatchToMnodePeerQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = dnodeDispatchToMnodePeerQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = dnodeDispatchToMnodePeerQueue;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册